This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_fixes_7 in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1e32b39f9edcb5bfb6c1ed87713f578f4c26a817 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Oct 14 12:38:09 2025 +0200 fixes --- core/bench/src/actors/producer/client/low_level.rs | 2 +- .../consumer_offsets/delete_consumer_offset.rs | 2 +- .../consumer_offsets/get_consumer_offset.rs | 2 +- .../consumer_offsets/store_consumer_offset.rs | 2 +- core/common/src/commands/messages/poll_messages.rs | 2 +- core/integration/src/test_server.rs | 48 +++++++----- .../data_integrity/verify_after_server_restart.rs | 91 ++++++++++++++++------ core/integration/tests/mcp/mod.rs | 20 ++--- .../tests/server/scenarios/encryption_scenario.rs | 8 +- .../tests/server/scenarios/tcp_tls_scenario.rs | 2 +- core/server/server.http | 2 +- core/server/src/bootstrap.rs | 22 +++--- core/server/src/quic/quic_server.rs | 3 + core/server/src/shard/mod.rs | 6 +- core/server/src/shard/system/messages.rs | 19 ++++- core/server/src/slab/streams.rs | 9 ++- core/server/src/tcp/tcp_listener.rs | 3 + core/server/src/tcp/tcp_tls_listener.rs | 3 + 18 files changed, 164 insertions(+), 82 deletions(-) diff --git a/core/bench/src/actors/producer/client/low_level.rs b/core/bench/src/actors/producer/client/low_level.rs index 86d022648..fc2f483ed 100644 --- a/core/bench/src/actors/producer/client/low_level.rs +++ b/core/bench/src/actors/producer/client/low_level.rs @@ -49,7 +49,7 @@ impl LowLevelProducerClient { client: None, stream_id: Identifier::default(), topic_id: Identifier::default(), - partitioning: Partitioning::partition_id(1), + partitioning: Partitioning::partition_id(0), } } } diff --git a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs index aab6c096d..8cfeac24d 100644 --- a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs @@ -54,7 +54,7 @@ impl Default for DeleteConsumerOffset { consumer: Consumer::default(), stream_id: Identifier::default(), topic_id: Identifier::default(), - partition_id: Some(1), + partition_id: Some(0), } } } diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs index c2a3dda8e..a98ac6c4d 100644 --- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs @@ -67,7 +67,7 @@ impl Command for GetConsumerOffset { } fn default_partition_id() -> Option<u32> { - Some(1) + Some(0) } impl Validatable<IggyError> for GetConsumerOffset { diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs index ad3c4eae6..9500138d1 100644 --- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs @@ -57,7 +57,7 @@ impl Default for StoreConsumerOffset { consumer: Consumer::default(), stream_id: Identifier::default(), topic_id: Identifier::default(), - partition_id: Some(1), + partition_id: Some(0), offset: 0, } } diff --git a/core/common/src/commands/messages/poll_messages.rs b/core/common/src/commands/messages/poll_messages.rs index 3f4045bbd..e51b03ef6 100644 --- a/core/common/src/commands/messages/poll_messages.rs +++ b/core/common/src/commands/messages/poll_messages.rs @@ -24,7 +24,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::fmt::Display; -pub const DEFAULT_PARTITION_ID: u32 = 1; +pub const DEFAULT_PARTITION_ID: u32 = 0; pub const DEFAULT_NUMBER_OF_MESSAGES_TO_POLL: u32 = 10; /// `PollMessages` command is used to poll messages from a topic in a stream. diff --git a/core/integration/src/test_server.rs b/core/integration/src/test_server.rs index 7914c4c2e..160094d26 100644 --- a/core/integration/src/test_server.rs +++ b/core/integration/src/test_server.rs @@ -408,33 +408,41 @@ impl TestServer { }); if let Some(config) = config { - let quic_addr: SocketAddr = config.quic.address.parse().unwrap(); - if quic_addr.port() == 0 { - panic!("Quic address port is 0!"); + // Only validate and add enabled protocols + if config.quic.enabled { + let quic_addr: SocketAddr = config.quic.address.parse().unwrap(); + if quic_addr.port() == 0 { + panic!("Quic address port is 0!"); + } + self.server_addrs + .push(ServerProtocolAddr::QuicUdp(quic_addr)); } - let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap(); - if tcp_addr.port() == 0 { - panic!("Tcp address port is 0!"); + if config.tcp.enabled { + let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap(); + if tcp_addr.port() == 0 { + panic!("Tcp address port is 0!"); + } + self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr)); } - let http_addr: SocketAddr = config.http.address.parse().unwrap(); - if http_addr.port() == 0 { - panic!("Http address port is 0!"); + if config.http.enabled { + let http_addr: SocketAddr = config.http.address.parse().unwrap(); + if http_addr.port() == 0 { + panic!("Http address port is 0!"); + } + self.server_addrs + .push(ServerProtocolAddr::HttpTcp(http_addr)); } - let websocket_addr: SocketAddr = config.websocket.address.parse().unwrap(); - if websocket_addr.port() == 0 { - panic!("WebSocket address port is 0!"); + if config.websocket.enabled { + let websocket_addr: SocketAddr = config.websocket.address.parse().unwrap(); + if websocket_addr.port() == 0 { + panic!("WebSocket address port is 0!"); + } + self.server_addrs + .push(ServerProtocolAddr::WebSocket(websocket_addr)); } - - self.server_addrs - .push(ServerProtocolAddr::QuicUdp(quic_addr)); - self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr)); - self.server_addrs - .push(ServerProtocolAddr::HttpTcp(http_addr)); - self.server_addrs - .push(ServerProtocolAddr::WebSocket(websocket_addr)); } else { panic!( "Failed to load config from file {config_path} in {MAX_PORT_WAIT_DURATION_S} s!" diff --git a/core/integration/tests/data_integrity/verify_after_server_restart.rs b/core/integration/tests/data_integrity/verify_after_server_restart.rs index 17e029e0d..10414364c 100644 --- a/core/integration/tests/data_integrity/verify_after_server_restart.rs +++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs @@ -28,10 +28,6 @@ use serial_test::parallel; use std::{collections::HashMap, str::FromStr}; use test_case::test_matrix; -/* - * Helper functions for test matrix parameters - */ - fn cache_open_segment() -> &'static str { "open_segment" } @@ -85,17 +81,6 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) amount_of_data_to_process, ); - let default_bench_stream_identifiers: [Identifier; 8] = [ - Identifier::numeric(3000001).unwrap(), - Identifier::numeric(3000002).unwrap(), - Identifier::numeric(3000003).unwrap(), - Identifier::numeric(3000004).unwrap(), - Identifier::numeric(3000005).unwrap(), - Identifier::numeric(3000006).unwrap(), - Identifier::numeric(3000007).unwrap(), - Identifier::numeric(3000008).unwrap(), - ]; - // 4. Connect and login to newly started server let client = TcpClientFactory { server_addr, @@ -103,12 +88,15 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) } .create_client() .await; + let client = IggyClient::create(client, None, None); login_root(&client).await; - let topic_id = Identifier::numeric(1).unwrap(); - for stream_id in default_bench_stream_identifiers { + + let topic_id = Identifier::numeric(0).unwrap(); + for i in 0..7 { + let stream_id = Identifier::numeric(i).unwrap(); client - .flush_unsaved_buffer(&stream_id, &topic_id, 1, false) + .flush_unsaved_buffer(&stream_id, &topic_id, 0, true) .await .unwrap(); } @@ -133,7 +121,50 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) test_server.start(); let server_addr = test_server.get_raw_tcp_addr().unwrap(); - // 8. Run send bench again to add more data + // 8. Verify stats are preserved after restart (before adding more data) + let client_after_restart = IggyClient::create( + TcpClientFactory { + server_addr: server_addr.clone(), + ..Default::default() + } + .create_client() + .await, + None, + None, + ); + login_root(&client_after_restart).await; + + let stats_after_restart = client_after_restart.get_stats().await.unwrap(); + assert_eq!( + expected_messages_count, stats_after_restart.messages_count, + "Messages count should be preserved after restart (before: {}, after: {})", + expected_messages_count, stats_after_restart.messages_count + ); + assert_eq!( + expected_messages_size_bytes.as_bytes_usize(), + stats_after_restart.messages_size_bytes.as_bytes_usize(), + "Messages size bytes should be preserved after restart (before: {}, after: {})", + expected_messages_size_bytes.as_bytes_usize(), + stats_after_restart.messages_size_bytes.as_bytes_usize() + ); + assert_eq!( + expected_streams_count, stats_after_restart.streams_count, + "Streams count should be preserved after restart" + ); + assert_eq!( + expected_topics_count, stats_after_restart.topics_count, + "Topics count should be preserved after restart" + ); + assert_eq!( + expected_partitions_count, stats_after_restart.partitions_count, + "Partitions count should be preserved after restart" + ); + assert_eq!( + expected_segments_count, stats_after_restart.segments_count, + "Segments count should be preserved after restart" + ); + + // 9. Run send bench again to add more data run_bench_and_wait_for_finish( &server_addr, &TransportProtocol::Tcp, @@ -141,7 +172,7 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) amount_of_data_to_process, ); - // 9. Run poll bench again to check if all data is still there + // 10. Run poll bench again to check if all data is still there run_bench_and_wait_for_finish( &server_addr, &TransportProtocol::Tcp, @@ -149,7 +180,7 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2), ); - // 10. Connect and login to newly started server + // 11. Connect and login to newly started server let client = IggyClient::create( TcpClientFactory { server_addr: server_addr.clone(), @@ -162,7 +193,17 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) ); login_root(&client).await; - // 11. Save stats from the second server (should have double the data) + // 12. Flush unsaved buffer + let topic_id = Identifier::numeric(0).unwrap(); + for i in 0..7 { + let stream_id = Identifier::numeric(i).unwrap(); + client + .flush_unsaved_buffer(&stream_id, &topic_id, 0, true) + .await + .unwrap(); + } + + // 13. Save stats from the second server (should have double the data) let stats = client.get_stats().await.unwrap(); let actual_messages_size_bytes = stats.messages_size_bytes; let actual_streams_count = stats.streams_count; @@ -173,7 +214,7 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) let actual_clients_count = stats.clients_count; let actual_consumer_groups_count = stats.consumer_groups_count; - // 12. Compare stats (expecting double the messages/size after second bench run) + // 14. Compare stats (expecting double the messages/size after second bench run) assert_eq!( expected_messages_size_bytes.as_bytes_usize() * 2, actual_messages_size_bytes.as_bytes_usize(), @@ -206,7 +247,7 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) "Consumer groups count" ); - // 13. Run poll bench to check if all data (10MB total) is still there + // 15. Run poll bench to check if all data (10MB total) is still there run_bench_and_wait_for_finish( &server_addr, &TransportProtocol::Tcp, @@ -214,6 +255,6 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2), ); - // 14. Manual cleanup + // 16. Manual cleanup std::fs::remove_dir_all(local_data_path).unwrap(); } diff --git a/core/integration/tests/mcp/mod.rs b/core/integration/tests/mcp/mod.rs index 635b88990..3ffd8fc51 100644 --- a/core/integration/tests/mcp/mod.rs +++ b/core/integration/tests/mcp/mod.rs @@ -179,7 +179,7 @@ async fn mcp_server_should_create_topic() { "create_topic", Some(json!({ "stream_id": STREAM_NAME, "name": name, "partitions_count": 1})), |topic| { - assert_eq!(topic.id, 2); + assert_eq!(topic.id, 1); assert_eq!(topic.name, name); assert_eq!(topic.partitions_count, 1); assert_eq!(topic.messages_count, 0); @@ -244,7 +244,7 @@ async fn mcp_server_should_delete_partitions() { async fn mcp_server_should_delete_segments() { assert_empty_response( "delete_segments", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "segments_count": 1 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "segments_count": 1 })), ) .await; } @@ -254,7 +254,7 @@ async fn mcp_server_should_delete_segments() { async fn mcp_server_should_poll_messages() { assert_response::<PolledMessages>( "poll_messages", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "offset": 0 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), |messages| { assert_eq!(messages.messages.len(), 1); let message = &messages.messages[0]; @@ -271,7 +271,7 @@ async fn mcp_server_should_poll_messages() { async fn mcp_server_should_send_messages() { assert_empty_response( "send_messages", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "messages": [ + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "messages": [ { "payload": "test" } @@ -375,11 +375,11 @@ async fn mcp_server_should_delete_consumer_group() { async fn mcp_server_should_return_consumer_offset() { assert_response::<Option<ConsumerOffsetInfo>>( "get_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0 })), |offset| { assert!(offset.is_some()); let offset = offset.unwrap(); - assert_eq!(offset.partition_id, 1); + assert_eq!(offset.partition_id, 0); assert_eq!(offset.stored_offset, 0); assert_eq!(offset.current_offset, 0); }, @@ -392,7 +392,7 @@ async fn mcp_server_should_return_consumer_offset() { async fn mcp_server_should_store_consumer_offset() { assert_empty_response( "store_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "offset": 0 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), ) .await; } @@ -402,7 +402,7 @@ async fn mcp_server_should_store_consumer_offset() { async fn mcp_server_should_delete_consumer_offset() { assert_empty_response( "delete_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "offset": 0 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), ) .await; } @@ -630,7 +630,7 @@ async fn seed_data(iggy_server_address: &str) { .send_messages( &STREAM_ID, &TOPIC_ID, - &Partitioning::partition_id(1), + &Partitioning::partition_id(0), &mut messages, ) .await @@ -640,7 +640,7 @@ async fn seed_data(iggy_server_address: &str) { Consumer::new(Identifier::named(CONSUMER_NAME).expect("Failed to create consumer")); iggy_client - .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(1), 0) + .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(0), 0) .await .expect("Failed to store consumer offset"); diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs b/core/integration/tests/server/scenarios/encryption_scenario.rs index 88c115851..77a81a6c1 100644 --- a/core/integration/tests/server/scenarios/encryption_scenario.rs +++ b/core/integration/tests/server/scenarios/encryption_scenario.rs @@ -130,7 +130,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .send_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - &Partitioning::partition_id(1), + &Partitioning::partition_id(0), &mut messages_batch_1, ) .await @@ -160,7 +160,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .poll_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - Some(1), + Some(0), &consumer, &PollingStrategy::offset(0), messages_per_batch.try_into().unwrap(), @@ -275,7 +275,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .send_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - &Partitioning::partition_id(1), // Use specific partition for testing + &Partitioning::partition_id(0), // Use specific partition for testing &mut messages_batch_2, ) .await @@ -300,7 +300,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .poll_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - Some(1), + Some(0), &consumer, &PollingStrategy::offset(0), messages_per_batch as u32 * 2, diff --git a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs index bac2b1858..4aa3e70c1 100644 --- a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs +++ b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs @@ -69,7 +69,7 @@ pub async fn run(client: &IggyClient) { .poll_messages( &Identifier::named(stream_name).unwrap(), &Identifier::named(topic_name).unwrap(), - Some(1), + Some(0), &Consumer::default(), &PollingStrategy::offset(0), 1, diff --git a/core/server/server.http b/core/server/server.http index cb46d4213..19ceb0861 100644 --- a/core/server/server.http +++ b/core/server/server.http @@ -18,7 +18,7 @@ @url = http://localhost:3000 @stream_id = 1 @topic_id = 1 -@partition_id = 1 +@partition_id = 0 @consumer_group_id = 1 @consumer_id = 1 @client_id = 1 diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 5beb6f41e..a103e8ae1 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -38,7 +38,7 @@ use crate::{ streams::stream2, topics::{consumer_group2, topic2}, users::user::User, - utils::file::overwrite, + utils::{crypto, file::overwrite}, }, versioning::SemanticVersion, }; @@ -48,8 +48,8 @@ use error_set::ErrContext; use iggy_common::{ ConsumerKind, IggyByteSize, IggyError, defaults::{ - DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, - MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, + DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, + MIN_USERNAME_LENGTH, }, }; use std::{collections::HashSet, env, path::Path, sync::Arc}; @@ -289,9 +289,8 @@ pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> } pub fn create_root_user() -> User { - info!("Creating root user..."); - let username = env::var(IGGY_ROOT_USERNAME_ENV); - let password = env::var(IGGY_ROOT_PASSWORD_ENV); + let mut username = env::var(IGGY_ROOT_USERNAME_ENV); + let mut password = env::var(IGGY_ROOT_PASSWORD_ENV); if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { panic!( "When providing the custom root user credentials, both username and password must be set." @@ -300,11 +299,15 @@ pub fn create_root_user() -> User { if username.is_ok() && password.is_ok() { info!("Using the custom root user credentials."); } else { - info!("Using the default root user credentials."); + info!("Using the default root user credentials..."); + username = Ok(DEFAULT_ROOT_USERNAME.to_string()); + let generated_password = crypto::generate_secret(20..40); + println!("Generated root user password: {generated_password}"); + password = Ok(generated_password); } - let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string()); - let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string()); + let username = username.expect("Root username is not set."); + let password = password.expect("Root password is not set."); if username.is_empty() || password.is_empty() { panic!("Root user credentials are not set."); } @@ -320,6 +323,7 @@ pub fn create_root_user() -> User { if password.len() > MAX_PASSWORD_LENGTH { panic!("Root password is too long."); } + User::root(&username, &password) } diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 1a67d64d8..c5033213a 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -127,6 +127,9 @@ pub async fn spawn_quic_server( shard.quic_bound_address.set(Some(actual_addr)); if addr.port() == 0 { + // Notify config writer on shard 0 + let _ = shard.config_writer_notify.try_send(()); + // Broadcast to other shards for SO_REUSEPORT binding let event = ShardEvent::AddressBound { protocol: iggy_common::TransportProtocol::Quic, diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 0996e1fc1..328578eee 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -135,7 +135,7 @@ pub struct IggyShard { pub(crate) quic_bound_address: Cell<Option<SocketAddr>>, pub(crate) websocket_bound_address: Cell<Option<SocketAddr>>, pub(crate) http_bound_address: Cell<Option<SocketAddr>>, - config_writer_notify: async_channel::Sender<()>, + pub(crate) config_writer_notify: async_channel::Sender<()>, config_writer_receiver: async_channel::Receiver<()>, pub(crate) task_registry: Rc<TaskRegistry>, } @@ -847,9 +847,9 @@ impl IggyShard { stream_id, topic_id, partition_id, - .. + fsync, } => { - self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, partition_id) + self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, partition_id, fsync) .await?; Ok(()) } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 7c543c3b7..bc04de427 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -281,9 +281,11 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, - _fsync: bool, + fsync: bool, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; + self.ensure_partition_exists(stream_id, topic_id, partition_id)?; + let numeric_stream_id = self .streams2 .with_stream_by_id(stream_id, streams::helpers::get_stream_id()); @@ -304,7 +306,7 @@ impl IggyShard { format!("{COMPONENT} (error: {error}) - permission denied to append messages for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), numeric_stream_id as u32, numeric_topic_id as u32) })?; - self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id) + self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, fsync) .await?; Ok(()) } @@ -314,8 +316,10 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, + fsync: bool, ) -> Result<(), IggyError> { - self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id) + self.ensure_partition_exists(stream_id, topic_id, partition_id)?; + self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, fsync) .await } @@ -324,6 +328,7 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, + fsync: bool, ) -> Result<(), IggyError> { let batches = self.streams2.with_partition_by_id_mut( stream_id, @@ -342,6 +347,14 @@ impl IggyShard { &self.config.system, ) .await?; + + // Ensure all data is flushed to disk before returning + if fsync { + self.streams2 + .fsync_all_messages(stream_id, topic_id, partition_id) + .await?; + } + Ok(()) } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 0ad76809c..7dccec27f 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1337,9 +1337,16 @@ impl Streams { partition_id: usize, ) -> Result<(), IggyError> { let storage = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.active_storage().clone() + if !log.has_segments() { + return None; + } + Some(log.active_storage().clone()) }); + let Some(storage) = storage else { + return Ok(()); + }; + if storage.messages_writer.is_none() || storage.index_writer.is_none() { return Ok(()); } diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index 580b7a60e..049c5b7f0 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -103,6 +103,9 @@ pub async fn start( shard.tcp_bound_address.set(Some(actual_addr)); if addr.port() == 0 { + // Notify config writer on shard 0 + let _ = shard.config_writer_notify.try_send(()); + // Broadcast to other shards for SO_REUSEPORT binding let event = ShardEvent::AddressBound { protocol: TransportProtocol::Tcp, diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index 7dc419603..c41233504 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -72,6 +72,9 @@ pub(crate) async fn start( if shard.id == 0 { shard.tcp_bound_address.set(Some(actual_addr)); if addr.port() == 0 { + // Notify config writer on shard 0 + let _ = shard.config_writer_notify.try_send(()); + let event = ShardEvent::AddressBound { protocol: TransportProtocol::Tcp, address: actual_addr,
