This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_fixes_7 in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 4e09f89f793aead1c709b8900718386a226eb66d Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Oct 14 12:38:09 2025 +0200 fixes --- core/bench/src/actors/producer/client/low_level.rs | 2 +- .../consumer_offsets/delete_consumer_offset.rs | 2 +- .../consumer_offsets/get_consumer_offset.rs | 2 +- .../consumer_offsets/store_consumer_offset.rs | 2 +- core/common/src/commands/messages/poll_messages.rs | 2 +- core/integration/src/test_server.rs | 48 +-- .../data_integrity/verify_after_server_restart.rs | 326 ++++++++++++++------- core/integration/tests/mcp/mod.rs | 20 +- .../tests/server/scenarios/encryption_scenario.rs | 8 +- .../tests/server/scenarios/tcp_tls_scenario.rs | 2 +- core/server/server.http | 2 +- core/server/src/bootstrap.rs | 22 +- core/server/src/quic/quic_server.rs | 3 + core/server/src/shard/mod.rs | 6 +- core/server/src/shard/system/messages.rs | 19 +- core/server/src/slab/streams.rs | 9 +- core/server/src/tcp/tcp_listener.rs | 3 + core/server/src/tcp/tcp_tls_listener.rs | 3 + 18 files changed, 319 insertions(+), 162 deletions(-) diff --git a/core/bench/src/actors/producer/client/low_level.rs b/core/bench/src/actors/producer/client/low_level.rs index 86d022648..fc2f483ed 100644 --- a/core/bench/src/actors/producer/client/low_level.rs +++ b/core/bench/src/actors/producer/client/low_level.rs @@ -49,7 +49,7 @@ impl LowLevelProducerClient { client: None, stream_id: Identifier::default(), topic_id: Identifier::default(), - partitioning: Partitioning::partition_id(1), + partitioning: Partitioning::partition_id(0), } } } diff --git a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs index aab6c096d..8cfeac24d 100644 --- a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs @@ -54,7 +54,7 @@ impl Default for DeleteConsumerOffset { consumer: Consumer::default(), stream_id: Identifier::default(), topic_id: Identifier::default(), - partition_id: Some(1), + partition_id: Some(0), } } } diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs index c2a3dda8e..a98ac6c4d 100644 --- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs @@ -67,7 +67,7 @@ impl Command for GetConsumerOffset { } fn default_partition_id() -> Option<u32> { - Some(1) + Some(0) } impl Validatable<IggyError> for GetConsumerOffset { diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs index ad3c4eae6..9500138d1 100644 --- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs +++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs @@ -57,7 +57,7 @@ impl Default for StoreConsumerOffset { consumer: Consumer::default(), stream_id: Identifier::default(), topic_id: Identifier::default(), - partition_id: Some(1), + partition_id: Some(0), offset: 0, } } diff --git a/core/common/src/commands/messages/poll_messages.rs b/core/common/src/commands/messages/poll_messages.rs index 3f4045bbd..e51b03ef6 100644 --- a/core/common/src/commands/messages/poll_messages.rs +++ b/core/common/src/commands/messages/poll_messages.rs @@ -24,7 +24,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::fmt::Display; -pub const DEFAULT_PARTITION_ID: u32 = 1; +pub const DEFAULT_PARTITION_ID: u32 = 0; pub const DEFAULT_NUMBER_OF_MESSAGES_TO_POLL: u32 = 10; /// `PollMessages` command is used to poll messages from a topic in a stream. diff --git a/core/integration/src/test_server.rs b/core/integration/src/test_server.rs index 7914c4c2e..160094d26 100644 --- a/core/integration/src/test_server.rs +++ b/core/integration/src/test_server.rs @@ -408,33 +408,41 @@ impl TestServer { }); if let Some(config) = config { - let quic_addr: SocketAddr = config.quic.address.parse().unwrap(); - if quic_addr.port() == 0 { - panic!("Quic address port is 0!"); + // Only validate and add enabled protocols + if config.quic.enabled { + let quic_addr: SocketAddr = config.quic.address.parse().unwrap(); + if quic_addr.port() == 0 { + panic!("Quic address port is 0!"); + } + self.server_addrs + .push(ServerProtocolAddr::QuicUdp(quic_addr)); } - let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap(); - if tcp_addr.port() == 0 { - panic!("Tcp address port is 0!"); + if config.tcp.enabled { + let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap(); + if tcp_addr.port() == 0 { + panic!("Tcp address port is 0!"); + } + self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr)); } - let http_addr: SocketAddr = config.http.address.parse().unwrap(); - if http_addr.port() == 0 { - panic!("Http address port is 0!"); + if config.http.enabled { + let http_addr: SocketAddr = config.http.address.parse().unwrap(); + if http_addr.port() == 0 { + panic!("Http address port is 0!"); + } + self.server_addrs + .push(ServerProtocolAddr::HttpTcp(http_addr)); } - let websocket_addr: SocketAddr = config.websocket.address.parse().unwrap(); - if websocket_addr.port() == 0 { - panic!("WebSocket address port is 0!"); + if config.websocket.enabled { + let websocket_addr: SocketAddr = config.websocket.address.parse().unwrap(); + if websocket_addr.port() == 0 { + panic!("WebSocket address port is 0!"); + } + self.server_addrs + .push(ServerProtocolAddr::WebSocket(websocket_addr)); } - - self.server_addrs - .push(ServerProtocolAddr::QuicUdp(quic_addr)); - self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr)); - self.server_addrs - .push(ServerProtocolAddr::HttpTcp(http_addr)); - self.server_addrs - .push(ServerProtocolAddr::WebSocket(websocket_addr)); } else { panic!( "Failed to load config from file {config_path} in {MAX_PORT_WAIT_DURATION_S} s!" diff --git a/core/integration/tests/data_integrity/verify_after_server_restart.rs b/core/integration/tests/data_integrity/verify_after_server_restart.rs index 17e029e0d..4c4aae1d6 100644 --- a/core/integration/tests/data_integrity/verify_after_server_restart.rs +++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs @@ -16,21 +16,25 @@ * under the License. */ +use bytes::Bytes; use iggy::clients::client::IggyClient; -use iggy::prelude::{Identifier, IggyByteSize, MessageClient, SystemClient}; -use iggy_common::TransportProtocol; -use integration::bench_utils::run_bench_and_wait_for_finish; +use iggy::prelude::*; use integration::{ tcp_client::TcpClientFactory, test_server::{ClientFactory, IpAddrKind, SYSTEM_PATH_ENV_VAR, TestServer, login_root}, }; use serial_test::parallel; -use std::{collections::HashMap, str::FromStr}; +use std::collections::HashMap; use test_case::test_matrix; -/* - * Helper functions for test matrix parameters - */ +// Constants matching iggy-bench behavior +const NUMBER_OF_STREAMS: u32 = 1; +const TOPIC_ID: u32 = 0; +const PARTITION_ID: u32 = 0; +const PARTITIONS_COUNT: u32 = 1; +const MESSAGE_BATCHES: u64 = 100; +const MESSAGES_PER_BATCH: u64 = 100; +const TOTAL_MESSAGES_PER_STREAM: u64 = MESSAGE_BATCHES * MESSAGES_PER_BATCH; // 10,000 fn cache_open_segment() -> &'static str { "open_segment" @@ -44,7 +48,6 @@ fn cache_none() -> &'static str { "none" } -// TODO(numminex) - Move the message generation method from benchmark run to a special method. #[test_matrix( [cache_open_segment(), cache_all(), cache_none()] )] @@ -66,54 +69,29 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) let mut test_server = TestServer::new(Some(env_vars.clone()), false, None, IpAddrKind::V4); test_server.start(); let server_addr = test_server.get_raw_tcp_addr().unwrap(); - let local_data_path = test_server.get_local_data_path().to_owned(); - - // 2. Run send bench to fill 5 MB of data - let amount_of_data_to_process = IggyByteSize::from_str("5 MB").unwrap(); - run_bench_and_wait_for_finish( - &server_addr, - &TransportProtocol::Tcp, - "pinned-producer", - amount_of_data_to_process, - ); - // 3. Run poll bench to check if everything's OK - run_bench_and_wait_for_finish( - &server_addr, - &TransportProtocol::Tcp, - "pinned-consumer", - amount_of_data_to_process, - ); - - let default_bench_stream_identifiers: [Identifier; 8] = [ - Identifier::numeric(3000001).unwrap(), - Identifier::numeric(3000002).unwrap(), - Identifier::numeric(3000003).unwrap(), - Identifier::numeric(3000004).unwrap(), - Identifier::numeric(3000005).unwrap(), - Identifier::numeric(3000006).unwrap(), - Identifier::numeric(3000007).unwrap(), - Identifier::numeric(3000008).unwrap(), - ]; - - // 4. Connect and login to newly started server + // 2. Connect, login, and create streams/topics let client = TcpClientFactory { - server_addr, + server_addr: server_addr.clone(), ..Default::default() } .create_client() .await; let client = IggyClient::create(client, None, None); login_root(&client).await; - let topic_id = Identifier::numeric(1).unwrap(); - for stream_id in default_bench_stream_identifiers { - client - .flush_unsaved_buffer(&stream_id, &topic_id, 1, false) - .await - .unwrap(); - } - // 5. Save stats from the first server + create_streams_and_topics(&client).await; + + // 3. Produce messages + produce_messages(&client, TOTAL_MESSAGES_PER_STREAM).await; + + // 4. Consume and verify all messages + consume_messages(&client, TOTAL_MESSAGES_PER_STREAM).await; + + // 5. Flush unsaved buffers for all streams + flush_all_streams(&client).await; + + // 6. Save stats from the first server let stats = client.get_stats().await.unwrap(); let expected_messages_size_bytes = stats.messages_size_bytes; let expected_streams_count = stats.streams_count; @@ -124,33 +102,17 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) let expected_clients_count = stats.clients_count; let expected_consumer_groups_count = stats.consumer_groups_count; - // 6. Stop server + // 7. Stop server test_server.stop(); drop(test_server); - // 7. Restart server with same settings + // 8. Restart server with same settings let mut test_server = TestServer::new(Some(env_vars.clone()), false, None, IpAddrKind::V4); test_server.start(); let server_addr = test_server.get_raw_tcp_addr().unwrap(); - // 8. Run send bench again to add more data - run_bench_and_wait_for_finish( - &server_addr, - &TransportProtocol::Tcp, - "pinned-producer", - amount_of_data_to_process, - ); - - // 9. Run poll bench again to check if all data is still there - run_bench_and_wait_for_finish( - &server_addr, - &TransportProtocol::Tcp, - "pinned-consumer", - IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2), - ); - - // 10. Connect and login to newly started server - let client = IggyClient::create( + // 9. Verify stats are preserved after restart (before adding more data) + let client_after_restart = IggyClient::create( TcpClientFactory { server_addr: server_addr.clone(), ..Default::default() @@ -160,60 +122,214 @@ async fn should_fill_data_and_verify_after_restart(cache_setting: &'static str) None, None, ); - login_root(&client).await; + login_root(&client_after_restart).await; - // 11. Save stats from the second server (should have double the data) - let stats = client.get_stats().await.unwrap(); - let actual_messages_size_bytes = stats.messages_size_bytes; - let actual_streams_count = stats.streams_count; - let actual_topics_count = stats.topics_count; - let actual_partitions_count = stats.partitions_count; - let actual_segments_count = stats.segments_count; - let actual_messages_count = stats.messages_count; - let actual_clients_count = stats.clients_count; - let actual_consumer_groups_count = stats.consumer_groups_count; - - // 12. Compare stats (expecting double the messages/size after second bench run) + let stats_after_restart = client_after_restart.get_stats().await.unwrap(); assert_eq!( - expected_messages_size_bytes.as_bytes_usize() * 2, - actual_messages_size_bytes.as_bytes_usize(), - "Messages size bytes should be doubled" + expected_messages_count, stats_after_restart.messages_count, + "Messages count should be preserved after restart", + ); + assert_eq!( + expected_messages_size_bytes.as_bytes_usize(), + stats_after_restart.messages_size_bytes.as_bytes_usize(), + "Messages size bytes should be preserved after restart" + ); + assert_eq!( + expected_streams_count, stats_after_restart.streams_count, + "Streams count should be preserved after restart" + ); + assert_eq!( + expected_topics_count, stats_after_restart.topics_count, + "Topics count should be preserved after restart" + ); + assert_eq!( + expected_partitions_count, stats_after_restart.partitions_count, + "Partitions count should be preserved after restart" + ); + assert_eq!( + expected_segments_count, stats_after_restart.segments_count, + "Segments count should be preserved after restart" + ); + + // 10. Verify streams and topics still exist after restart + for stream_id in 0..NUMBER_OF_STREAMS { + let stream = client_after_restart + .get_stream(&Identifier::numeric(stream_id).unwrap()) + .await + .unwrap(); + assert!( + stream.is_some(), + "Stream {} should exist after restart", + stream_id + ); + } + + // 11. Verify we can still consume the first batch after restart + consume_messages(&client_after_restart, TOTAL_MESSAGES_PER_STREAM).await; + + // 12. Produce more messages (second batch) + produce_messages(&client_after_restart, TOTAL_MESSAGES_PER_STREAM).await; + + // 15. Save stats from the second server (should have double the data) + let stats_after_2nd_send = client_after_restart.get_stats().await.unwrap(); + eprintln!("Stats after restart: {:#?}", stats_after_2nd_send); + + // 16. Compare stats (expecting double the messages/size after second bench run) + assert_eq!( + expected_streams_count, stats_after_2nd_send.streams_count, + "Streams count should be the same" + ); + assert_eq!( + expected_topics_count, stats_after_2nd_send.topics_count, + "Topics count should be the same" ); assert_eq!( - expected_streams_count, actual_streams_count, - "Streams count" + expected_partitions_count, stats_after_2nd_send.partitions_count, + "Partitions count should be the same" ); - assert_eq!(expected_topics_count, actual_topics_count, "Topics count"); assert_eq!( - expected_partitions_count, actual_partitions_count, - "Partitions count" + stats_after_2nd_send.segments_count, expected_segments_count, + "Segments count should be the same" ); - assert!( - actual_segments_count >= expected_segments_count, - "Segments count should be at least the same or more" + assert_eq!( + expected_messages_size_bytes.as_bytes_usize() * 2, + stats_after_2nd_send.messages_size_bytes.as_bytes_usize(), + "Messages size bytes should be doubled" ); assert_eq!( expected_messages_count * 2, - actual_messages_count, + stats_after_2nd_send.messages_count, "Messages count should be doubled" ); assert_eq!( - expected_clients_count, actual_clients_count, - "Clients count" + expected_clients_count, stats_after_2nd_send.clients_count, + "Clients count should be the same" ); assert_eq!( - expected_consumer_groups_count, actual_consumer_groups_count, - "Consumer groups count" + expected_consumer_groups_count, stats_after_2nd_send.consumer_groups_count, + "Consumer groups count should be the same" ); - // 13. Run poll bench to check if all data (10MB total) is still there - run_bench_and_wait_for_finish( - &server_addr, - &TransportProtocol::Tcp, - "pinned-consumer", - IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2), - ); + eprintln!("Consuming all messages after restart"); + + // 13. Consume all messages (should be 20K total per stream) + consume_messages(&client_after_restart, TOTAL_MESSAGES_PER_STREAM * 2).await; +} + +/// Create all streams and topics matching iggy-bench behavior +async fn create_streams_and_topics(client: &IggyClient) { + for stream_id in 0..NUMBER_OF_STREAMS { + // Create stream + client.create_stream(&stream_id.to_string()).await.unwrap(); + + // Create topic + client + .create_topic( + &Identifier::numeric(stream_id).unwrap(), + &TOPIC_ID.to_string(), + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + } +} + +/// Create messages with the specified payload size +fn create_messages(message_payload_size: usize, count: u64) -> Vec<IggyMessage> { + let mut messages = Vec::with_capacity(count as usize); + let payload = Bytes::from(vec![0xAB; message_payload_size]); + + for offset in 0..count { + let id = (offset + 1) as u128; + let message = IggyMessage::builder() + .id(id) + .payload(payload.clone()) + .build() + .expect("Failed to create message"); + messages.push(message); + } + messages +} - // 14. Manual cleanup - std::fs::remove_dir_all(local_data_path).unwrap(); +/// Produce messages to all streams/topics matching iggy-bench behavior +async fn produce_messages(client: &IggyClient, messages_per_stream: u64) { + const MESSAGE_PAYLOAD_SIZE: usize = 100; + for stream_id in 0..NUMBER_OF_STREAMS { + let mut messages = create_messages(MESSAGE_PAYLOAD_SIZE, messages_per_stream); + client + .send_messages( + &Identifier::numeric(stream_id).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut messages, + ) + .await + .unwrap(); + } +} + +/// Consume and verify messages from all streams/topics +async fn consume_messages(client: &IggyClient, expected_messages_per_stream: u64) { + for stream_id in 0..NUMBER_OF_STREAMS { + let mut total_messages = 0u64; + let mut offset = 0u64; + + // Poll messages in batches until we get all expected messages + while total_messages < expected_messages_per_stream { + let polled_messages = client + .poll_messages( + &Identifier::numeric(stream_id).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + Some(PARTITION_ID), + &Consumer::default(), + &PollingStrategy::offset(offset), + 1000, // Poll up to 1000 messages at a time + false, + ) + .await + .unwrap(); + + if polled_messages.messages.is_empty() { + eprintln!( + "Stream {}: No more messages at offset {}, total so far: {}/{}", + stream_id, offset, total_messages, expected_messages_per_stream + ); + break; + } + + let messages_count = polled_messages.messages.len() as u64; + total_messages += messages_count; + eprintln!( + "Stream {}: Polled {} messages at offset {}, total: {}/{}", + stream_id, messages_count, offset, total_messages, expected_messages_per_stream + ); + + // Update offset to the last message's offset + 1 + if let Some(last_msg) = polled_messages.messages.last() { + offset = last_msg.header.offset + 1; + } + } + + assert_eq!( + total_messages, expected_messages_per_stream, + "Expected {} messages in stream {}, but got {}", + expected_messages_per_stream, stream_id, total_messages + ); + } +} + +/// Flush unsaved buffers for all streams +async fn flush_all_streams(client: &IggyClient) { + let topic_id = Identifier::numeric(TOPIC_ID).unwrap(); + for stream_id in 0..NUMBER_OF_STREAMS { + let stream_id = Identifier::numeric(stream_id).unwrap(); + client + .flush_unsaved_buffer(&stream_id, &topic_id, PARTITION_ID, true) + .await + .unwrap(); + } } diff --git a/core/integration/tests/mcp/mod.rs b/core/integration/tests/mcp/mod.rs index 635b88990..3ffd8fc51 100644 --- a/core/integration/tests/mcp/mod.rs +++ b/core/integration/tests/mcp/mod.rs @@ -179,7 +179,7 @@ async fn mcp_server_should_create_topic() { "create_topic", Some(json!({ "stream_id": STREAM_NAME, "name": name, "partitions_count": 1})), |topic| { - assert_eq!(topic.id, 2); + assert_eq!(topic.id, 1); assert_eq!(topic.name, name); assert_eq!(topic.partitions_count, 1); assert_eq!(topic.messages_count, 0); @@ -244,7 +244,7 @@ async fn mcp_server_should_delete_partitions() { async fn mcp_server_should_delete_segments() { assert_empty_response( "delete_segments", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "segments_count": 1 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "segments_count": 1 })), ) .await; } @@ -254,7 +254,7 @@ async fn mcp_server_should_delete_segments() { async fn mcp_server_should_poll_messages() { assert_response::<PolledMessages>( "poll_messages", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "offset": 0 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), |messages| { assert_eq!(messages.messages.len(), 1); let message = &messages.messages[0]; @@ -271,7 +271,7 @@ async fn mcp_server_should_poll_messages() { async fn mcp_server_should_send_messages() { assert_empty_response( "send_messages", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "messages": [ + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "messages": [ { "payload": "test" } @@ -375,11 +375,11 @@ async fn mcp_server_should_delete_consumer_group() { async fn mcp_server_should_return_consumer_offset() { assert_response::<Option<ConsumerOffsetInfo>>( "get_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0 })), |offset| { assert!(offset.is_some()); let offset = offset.unwrap(); - assert_eq!(offset.partition_id, 1); + assert_eq!(offset.partition_id, 0); assert_eq!(offset.stored_offset, 0); assert_eq!(offset.current_offset, 0); }, @@ -392,7 +392,7 @@ async fn mcp_server_should_return_consumer_offset() { async fn mcp_server_should_store_consumer_offset() { assert_empty_response( "store_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "offset": 0 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), ) .await; } @@ -402,7 +402,7 @@ async fn mcp_server_should_store_consumer_offset() { async fn mcp_server_should_delete_consumer_offset() { assert_empty_response( "delete_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 1, "offset": 0 })), + Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), ) .await; } @@ -630,7 +630,7 @@ async fn seed_data(iggy_server_address: &str) { .send_messages( &STREAM_ID, &TOPIC_ID, - &Partitioning::partition_id(1), + &Partitioning::partition_id(0), &mut messages, ) .await @@ -640,7 +640,7 @@ async fn seed_data(iggy_server_address: &str) { Consumer::new(Identifier::named(CONSUMER_NAME).expect("Failed to create consumer")); iggy_client - .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(1), 0) + .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(0), 0) .await .expect("Failed to store consumer offset"); diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs b/core/integration/tests/server/scenarios/encryption_scenario.rs index 88c115851..77a81a6c1 100644 --- a/core/integration/tests/server/scenarios/encryption_scenario.rs +++ b/core/integration/tests/server/scenarios/encryption_scenario.rs @@ -130,7 +130,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .send_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - &Partitioning::partition_id(1), + &Partitioning::partition_id(0), &mut messages_batch_1, ) .await @@ -160,7 +160,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .poll_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - Some(1), + Some(0), &consumer, &PollingStrategy::offset(0), messages_per_batch.try_into().unwrap(), @@ -275,7 +275,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .send_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - &Partitioning::partition_id(1), // Use specific partition for testing + &Partitioning::partition_id(0), // Use specific partition for testing &mut messages_batch_2, ) .await @@ -300,7 +300,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp .poll_messages( &Identifier::numeric(stream_id).unwrap(), &Identifier::numeric(topic_id).unwrap(), - Some(1), + Some(0), &consumer, &PollingStrategy::offset(0), messages_per_batch as u32 * 2, diff --git a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs index bac2b1858..4aa3e70c1 100644 --- a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs +++ b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs @@ -69,7 +69,7 @@ pub async fn run(client: &IggyClient) { .poll_messages( &Identifier::named(stream_name).unwrap(), &Identifier::named(topic_name).unwrap(), - Some(1), + Some(0), &Consumer::default(), &PollingStrategy::offset(0), 1, diff --git a/core/server/server.http b/core/server/server.http index cb46d4213..19ceb0861 100644 --- a/core/server/server.http +++ b/core/server/server.http @@ -18,7 +18,7 @@ @url = http://localhost:3000 @stream_id = 1 @topic_id = 1 -@partition_id = 1 +@partition_id = 0 @consumer_group_id = 1 @consumer_id = 1 @client_id = 1 diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 5beb6f41e..a103e8ae1 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -38,7 +38,7 @@ use crate::{ streams::stream2, topics::{consumer_group2, topic2}, users::user::User, - utils::file::overwrite, + utils::{crypto, file::overwrite}, }, versioning::SemanticVersion, }; @@ -48,8 +48,8 @@ use error_set::ErrContext; use iggy_common::{ ConsumerKind, IggyByteSize, IggyError, defaults::{ - DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, - MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, + DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, + MIN_USERNAME_LENGTH, }, }; use std::{collections::HashSet, env, path::Path, sync::Arc}; @@ -289,9 +289,8 @@ pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> } pub fn create_root_user() -> User { - info!("Creating root user..."); - let username = env::var(IGGY_ROOT_USERNAME_ENV); - let password = env::var(IGGY_ROOT_PASSWORD_ENV); + let mut username = env::var(IGGY_ROOT_USERNAME_ENV); + let mut password = env::var(IGGY_ROOT_PASSWORD_ENV); if (username.is_ok() && password.is_err()) || (username.is_err() && password.is_ok()) { panic!( "When providing the custom root user credentials, both username and password must be set." @@ -300,11 +299,15 @@ pub fn create_root_user() -> User { if username.is_ok() && password.is_ok() { info!("Using the custom root user credentials."); } else { - info!("Using the default root user credentials."); + info!("Using the default root user credentials..."); + username = Ok(DEFAULT_ROOT_USERNAME.to_string()); + let generated_password = crypto::generate_secret(20..40); + println!("Generated root user password: {generated_password}"); + password = Ok(generated_password); } - let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string()); - let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string()); + let username = username.expect("Root username is not set."); + let password = password.expect("Root password is not set."); if username.is_empty() || password.is_empty() { panic!("Root user credentials are not set."); } @@ -320,6 +323,7 @@ pub fn create_root_user() -> User { if password.len() > MAX_PASSWORD_LENGTH { panic!("Root password is too long."); } + User::root(&username, &password) } diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 1a67d64d8..c5033213a 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -127,6 +127,9 @@ pub async fn spawn_quic_server( shard.quic_bound_address.set(Some(actual_addr)); if addr.port() == 0 { + // Notify config writer on shard 0 + let _ = shard.config_writer_notify.try_send(()); + // Broadcast to other shards for SO_REUSEPORT binding let event = ShardEvent::AddressBound { protocol: iggy_common::TransportProtocol::Quic, diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 0996e1fc1..328578eee 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -135,7 +135,7 @@ pub struct IggyShard { pub(crate) quic_bound_address: Cell<Option<SocketAddr>>, pub(crate) websocket_bound_address: Cell<Option<SocketAddr>>, pub(crate) http_bound_address: Cell<Option<SocketAddr>>, - config_writer_notify: async_channel::Sender<()>, + pub(crate) config_writer_notify: async_channel::Sender<()>, config_writer_receiver: async_channel::Receiver<()>, pub(crate) task_registry: Rc<TaskRegistry>, } @@ -847,9 +847,9 @@ impl IggyShard { stream_id, topic_id, partition_id, - .. + fsync, } => { - self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, partition_id) + self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, partition_id, fsync) .await?; Ok(()) } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 7c543c3b7..bc04de427 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -281,9 +281,11 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, - _fsync: bool, + fsync: bool, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; + self.ensure_partition_exists(stream_id, topic_id, partition_id)?; + let numeric_stream_id = self .streams2 .with_stream_by_id(stream_id, streams::helpers::get_stream_id()); @@ -304,7 +306,7 @@ impl IggyShard { format!("{COMPONENT} (error: {error}) - permission denied to append messages for user {} on stream ID: {}, topic ID: {}", session.get_user_id(), numeric_stream_id as u32, numeric_topic_id as u32) })?; - self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id) + self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, fsync) .await?; Ok(()) } @@ -314,8 +316,10 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, + fsync: bool, ) -> Result<(), IggyError> { - self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id) + self.ensure_partition_exists(stream_id, topic_id, partition_id)?; + self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, fsync) .await } @@ -324,6 +328,7 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, + fsync: bool, ) -> Result<(), IggyError> { let batches = self.streams2.with_partition_by_id_mut( stream_id, @@ -342,6 +347,14 @@ impl IggyShard { &self.config.system, ) .await?; + + // Ensure all data is flushed to disk before returning + if fsync { + self.streams2 + .fsync_all_messages(stream_id, topic_id, partition_id) + .await?; + } + Ok(()) } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 0ad76809c..7dccec27f 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1337,9 +1337,16 @@ impl Streams { partition_id: usize, ) -> Result<(), IggyError> { let storage = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.active_storage().clone() + if !log.has_segments() { + return None; + } + Some(log.active_storage().clone()) }); + let Some(storage) = storage else { + return Ok(()); + }; + if storage.messages_writer.is_none() || storage.index_writer.is_none() { return Ok(()); } diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index 580b7a60e..049c5b7f0 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -103,6 +103,9 @@ pub async fn start( shard.tcp_bound_address.set(Some(actual_addr)); if addr.port() == 0 { + // Notify config writer on shard 0 + let _ = shard.config_writer_notify.try_send(()); + // Broadcast to other shards for SO_REUSEPORT binding let event = ShardEvent::AddressBound { protocol: TransportProtocol::Tcp, diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index 7dc419603..c41233504 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -72,6 +72,9 @@ pub(crate) async fn start( if shard.id == 0 { shard.tcp_bound_address.set(Some(actual_addr)); if addr.port() == 0 { + // Notify config writer on shard 0 + let _ = shard.config_writer_notify.try_send(()); + let event = ShardEvent::AddressBound { protocol: TransportProtocol::Tcp, address: actual_addr,
