This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new b766cf70 feat(io_uring): fix loading segments on startup (#2176)
b766cf70 is described below
commit b766cf70d9b8d144f70a7a3e454a0dbfce0a151b
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Sep 18 19:00:17 2025 +0200
feat(io_uring): fix loading segments on startup (#2176)
---
.../tests/streaming/get_by_timestamp.rs | 42 +-
.../binary/handlers/topics/create_topic_handler.rs | 1 -
core/server/src/bootstrap.rs | 434 +++++++++++++++--
.../src/compat/index_rebuilding/index_rebuilder.rs | 4 +-
core/server/src/configs/system.rs | 14 +-
core/server/src/main.rs | 53 ++-
core/server/src/shard/builder.rs | 3 +-
core/server/src/shard/mod.rs | 96 +++-
core/server/src/shard/system/partitions.rs | 4 +-
core/server/src/slab/partitions.rs | 6 +-
core/server/src/slab/streams.rs | 29 +-
core/server/src/state/system.rs | 29 +-
core/server/src/streaming/partitions/helpers.rs | 6 -
core/server/src/streaming/partitions/log.rs | 15 +-
core/server/src/streaming/partitions/mod.rs | 1 -
core/server/src/streaming/partitions/partition2.rs | 2 +-
core/server/src/streaming/partitions/storage.rs | 266 -----------
core/server/src/streaming/segments/mod.rs | 5 -
.../src/streaming/segments/reading_messages.rs | 401 ----------------
core/server/src/streaming/segments/segment.rs | 529 ---------------------
core/server/src/streaming/segments/storage.rs | 9 +-
21 files changed, 631 insertions(+), 1318 deletions(-)
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs
b/core/integration/tests/streaming/get_by_timestamp.rs
index a40d2904..de30345a 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -16,17 +16,17 @@
* under the License.
*/
-use crate::streaming::common::test_setup::TestSetup;
use super::bootstrap_test_environment;
+use crate::streaming::common::test_setup::TestSetup;
use bytes::BytesMut;
use iggy::prelude::*;
use server::configs::cache_indexes::CacheIndexesConfig;
use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
+use server::shard::namespace::IggyFullNamespace;
use server::shard::system::messages::PollingArgs;
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::segments::IggyMessagesBatchMut;
use server::streaming::traits::MainOps;
-use server::shard::namespace::IggyFullNamespace;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
@@ -117,14 +117,20 @@ async fn test_get_messages_by_timestamp(
};
// Use the bootstrap method to create streams with proper slab structure
- let bootstrap_result = bootstrap_test_environment(shard_id as u16,
&config).await.unwrap();
+ let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config)
+ .await
+ .unwrap();
let streams = bootstrap_result.streams;
let stream_identifier = bootstrap_result.stream_id;
let topic_identifier = bootstrap_result.topic_id;
let partition_id = bootstrap_result.partition_id;
-
+
// Create namespace for MainOps calls
- let namespace = IggyFullNamespace::new(stream_identifier.clone(),
topic_identifier.clone(), partition_id);
+ let namespace = IggyFullNamespace::new(
+ stream_identifier.clone(),
+ topic_identifier.clone(),
+ partition_id,
+ );
let mut all_messages = Vec::with_capacity(total_messages_count as usize);
@@ -193,7 +199,10 @@ async fn test_get_messages_by_timestamp(
let batch =
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
assert_eq!(batch.count(), batch_len);
- streams.append_messages(shard_id as u16, &config, &namespace,
batch).await.unwrap();
+ streams
+ .append_messages(shard_id as u16, &config, &namespace, batch)
+ .await
+ .unwrap();
// Capture the timestamp of this batch
batch_timestamps.push(IggyTimestamp::now());
@@ -209,10 +218,15 @@ async fn test_get_messages_by_timestamp(
let total_sent_messages = total_messages_count;
// Create a single consumer to reuse throughout the test
- let consumer = PollingConsumer::consumer(&Identifier::numeric(1).unwrap(),
partition_id as usize);
+ let consumer =
+ PollingConsumer::consumer(&Identifier::numeric(1).unwrap(),
partition_id as usize);
// Test 1: All messages from initial timestamp
- let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp),
total_sent_messages, false);
+ let args = PollingArgs::new(
+ PollingStrategy::timestamp(initial_timestamp),
+ total_sent_messages,
+ false,
+ );
let (_, all_loaded_messages) = streams
.poll_messages(&namespace, consumer, args)
.await
@@ -235,7 +249,11 @@ async fn test_get_messages_by_timestamp(
let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
let remaining_messages = total_sent_messages - prior_batches_sum;
- let args =
PollingArgs::new(PollingStrategy::timestamp(middle_timestamp),
remaining_messages, false);
+ let args = PollingArgs::new(
+ PollingStrategy::timestamp(middle_timestamp),
+ remaining_messages,
+ false,
+ );
let (_, middle_messages) = streams
.poll_messages(&namespace, consumer, args)
.await
@@ -265,7 +283,11 @@ async fn test_get_messages_by_timestamp(
// Test 4: Small subset from initial timestamp
let subset_size = std::cmp::min(3, total_sent_messages);
- let args = PollingArgs::new(PollingStrategy::timestamp(initial_timestamp),
subset_size, false);
+ let args = PollingArgs::new(
+ PollingStrategy::timestamp(initial_timestamp),
+ subset_size,
+ false,
+ );
let (_, subset_messages) = streams
.poll_messages(&namespace, consumer, args)
.await
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 577ab62f..5e456e6a 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -87,7 +87,6 @@ impl ServerCommandHandler for CreateTopic {
partitions,
};
let _responses = shard.broadcast_event_to_all_shards(event).await;
- // TODO: Create shard_table records for partitions.
let response = shard.streams2.with_topic_by_id(
&self.stream_id,
&Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 8e3797b2..fc38b069 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -1,7 +1,13 @@
use crate::{
IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
- configs::{config_provider::ConfigProviderKind, server::ServerConfig,
system::SystemConfig},
- io::fs_utils,
+ compat::index_rebuilding::index_rebuilder::IndexRebuilder,
+ configs::{
+ cache_indexes::CacheIndexesConfig,
+ config_provider::ConfigProviderKind,
+ server::ServerConfig,
+ system::{INDEX_EXTENSION, LOG_EXTENSION, SystemConfig},
+ },
+ io::fs_utils::{self, DirEntry},
server_error::ServerError,
shard::{
system::info::SystemInfo,
@@ -18,15 +24,14 @@ use crate::{
},
state::system::{StreamState, TopicState, UserState},
streaming::{
- deduplication::message_deduplicator,
partitions::{
- consumer_offset::{self, ConsumerOffset},
- helpers::create_message_deduplicator,
- partition2::{self, ConsumerGroupOffsets, ConsumerOffsets},
+ consumer_offset::ConsumerOffset,
helpers::create_message_deduplicator,
+ journal::MemoryMessageJournal, log::SegmentedLog, partition2,
storage2::load_consumer_offsets,
},
persistence::persister::{FilePersister, FileWithSyncPersister,
PersisterKind},
personal_access_tokens::personal_access_token::PersonalAccessToken,
+ segments::{Segment2, storage::Storage},
stats::stats::{PartitionStats, StreamStats, TopicStats},
storage::SystemStorage,
streams::stream2,
@@ -38,6 +43,7 @@ use crate::{
};
use ahash::HashMap;
use compio::{fs::create_dir_all, runtime::Runtime};
+use error_set::ErrContext;
use iggy_common::{
ConsumerKind, IggyError, UserId,
defaults::{
@@ -46,9 +52,9 @@ use iggy_common::{
},
};
use std::{collections::HashSet, env, path::Path, sync::Arc};
-use tracing::info;
+use tracing::{info, warn};
-pub fn load_streams(
+pub async fn load_streams(
state: impl IntoIterator<Item = StreamState>,
config: &SystemConfig,
) -> Result<Streams, IggyError> {
@@ -121,56 +127,39 @@ pub fn load_streams(
let parent_stats = stats.clone();
let cgs = consumer_groups.into_values();
let partitions = partitions.into_values();
+
+ // Load each partition asynchronously and insert immediately
for partition_state in partitions {
info!(
"Loading partition with ID: {}, for topic with ID: {} from
state...",
partition_state.id, topic_id
);
+
+ let partition_id = partition_state.id;
+ let partition = load_partition(
+ config,
+ stream_id as usize,
+ topic_id,
+ partition_state,
+ parent_stats.clone(),
+ )
+ .await?;
+ // Insert partition into the container
streams.with_components_by_id(stream_id as usize, |(root, ..)|
{
- let parent_stats = parent_stats.clone();
root.topics()
- .with_components_by_id_mut(topic_id, |(root, ..)| {
- let stats =
Arc::new(PartitionStats::new(parent_stats));
- // TODO: This has to be sampled from segments, if
there exists more than 0 segements and the segment has more than 0 messages,
then we set it to true.
- let should_increment_offset = todo!();
- // TODO: This has to be sampled from segments and
it's indexes, offset = segment.start_offset + last_index.offset;
- let offset = todo!();
- let message_deduplicator =
create_message_deduplicator(config);
- let id = partition_state.id;
-
- let consumer_offset_path =
config.get_consumer_offsets_path(stream_id as usize, topic_id as usize, id as
usize);
- let consumer_group_offsets_path =
config.get_consumer_group_offsets_path(stream_id as usize, topic_id as usize,
id as usize);
- let consumer_offset =
Arc::new(load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
- .into_iter().map(|offset| {
- (offset.consumer_id as usize, offset)
- }).into());
- let consumer_group_offset =
Arc::new(load_consumer_offsets(&consumer_group_offsets_path,
ConsumerKind::ConsumerGroup)?.into_iter().map(|offset| {
- (offset.consumer_id as usize, offset)
- }).into());
-
- let log = Default::default();
- let partition = partition2::Partition::new(
- partition_state.created_at,
- should_increment_offset,
- stats,
- message_deduplicator,
- offset,
- consumer_offset,
- consumer_group_offset,
- log
- );
+ .with_components_by_id_mut(topic_id, |(mut root, ..)| {
let new_id =
root.partitions_mut().insert(partition);
assert_eq!(
- new_id, id as usize,
+ new_id, partition_id as usize,
"load_streams: partition id mismatch when
inserting partition, mismatch for partition with ID: {}, for topic with ID: {},
for stream with ID: {}",
- id, topic_id, stream_id
+ partition_id, topic_id, stream_id
);
- Ok(())
- })
- })?;
+ });
+ });
+
info!(
"Loaded partition with ID: {}, for topic with ID: {} from
state...",
- partition_state.id, topic_id
+ partition_id, topic_id
);
}
let partition_ids = streams.with_components_by_id(stream_id as
usize, |(root, ..)| {
@@ -384,3 +373,358 @@ pub async fn update_system_info(
storage.info.save(system_info).await?;
Ok(())
}
+
+async fn collect_log_files(partition_path: &str) -> Result<Vec<DirEntry>,
IggyError> {
+ let dir_entries = fs_utils::walk_dir(&partition_path)
+ .await
+ .map_err(|_| IggyError::CannotReadPartitions)?;
+ let mut log_files = Vec::new();
+ for entry in dir_entries {
+ if entry.is_dir {
+ continue;
+ }
+
+ let extension = entry.path.extension();
+ if extension.is_none() || extension.unwrap() != LOG_EXTENSION {
+ continue;
+ }
+
+ log_files.push(entry);
+ }
+
+ Ok(log_files)
+}
+
+pub async fn load_segments(
+ config: &SystemConfig,
+ stream_id: usize,
+ topic_id: usize,
+ partition_id: usize,
+ partition_path: String,
+ stats: Arc<PartitionStats>,
+) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
+ // Read directory entries to find log files using async fs_utils
+ let mut log_files = collect_log_files(&partition_path).await?;
+ log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
+ let mut log = SegmentedLog::<MemoryMessageJournal>::default();
+ for entry in log_files {
+ let log_file_name = entry
+ .path
+ .file_stem()
+ .unwrap()
+ .to_string_lossy()
+ .to_string();
+
+ let start_offset = log_file_name.parse::<u64>().unwrap();
+
+ // Build file paths directly
+ let messages_file_path = format!("{}/{}.{}", partition_path,
log_file_name, LOG_EXTENSION);
+ let index_file_path = format!("{}/{}.{}", partition_path,
log_file_name, INDEX_EXTENSION);
+ let time_index_path = index_file_path.replace(INDEX_EXTENSION,
"timeindex");
+
+ // Check if index files exist
+ async fn try_exists(path: &str) -> Result<bool, std::io::Error> {
+ match compio::fs::metadata(path).await {
+ Ok(_) => Ok(true),
+ Err(err) => match err.kind() {
+ std::io::ErrorKind::NotFound => Ok(false),
+ _ => Err(err),
+ },
+ }
+ }
+
+ let index_path_exists = try_exists(&index_file_path).await.unwrap();
+ let time_index_path_exists =
try_exists(&time_index_path).await.unwrap();
+ let index_cache_enabled = matches!(
+ config.segment.cache_indexes,
+ CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
+ );
+
+ // Rebuild indexes if index cache is enabled and index at path does
not exist
+ if index_cache_enabled && (!index_path_exists ||
time_index_path_exists) {
+ warn!(
+ "Index at path {} does not exist, rebuilding it based on
{}...",
+ index_file_path, messages_file_path
+ );
+ let now = std::time::Instant::now();
+ let index_rebuilder = IndexRebuilder::new(
+ messages_file_path.clone(),
+ index_file_path.clone(),
+ start_offset,
+ );
+ index_rebuilder.rebuild().await.unwrap_or_else(|e| {
+ panic!(
+ "Failed to rebuild index for partition with ID: {} for
stream with ID: {} and topic with ID: {}. Error: {e}",
+ partition_id, stream_id, topic_id,
+ )
+ });
+ info!(
+ "Rebuilding index for path {} finished, it took {} ms",
+ index_file_path,
+ now.elapsed().as_millis()
+ );
+ }
+
+ if time_index_path_exists {
+ compio::fs::remove_file(&time_index_path).await.unwrap();
+ }
+
+ // Get file metadata to determine segment properties
+ let messages_metadata = compio::fs::metadata(&messages_file_path)
+ .await
+ .map_err(|_| IggyError::CannotReadPartitions)?;
+ let messages_size = messages_metadata.len() as u32;
+
+ let index_size = match compio::fs::metadata(&index_file_path).await {
+ Ok(metadata) => metadata.len() as u32,
+ Err(_) => 0, // Default to 0 if index file doesn't exist
+ };
+
+ // Create storage for the segment using existing files
+ let storage = Storage::new(
+ &messages_file_path,
+ &index_file_path,
+ messages_size as u64,
+ index_size as u64,
+ config.partition.enforce_fsync,
+ config.partition.enforce_fsync,
+ true, // file_exists = true for existing segments
+ )
+ .await?;
+
+ // Load indexes from disk to calculate the correct end offset and
cache them if needed
+ // This matches the logic in Segment::load_from_disk method
+ let loaded_indexes = {
+ storage.
+ index_reader
+ .as_ref()
+ .unwrap()
+ .load_all_indexes_from_disk()
+ .await
+ .with_error_context(|error| format!("Failed to load indexes during
startup for stream ID: {}, topic ID: {}, partition_id: {}, {error}", stream_id,
topic_id, partition_id))
+ .map_err(|_| IggyError::CannotReadFile)?
+ };
+
+ // Calculate end offset based on loaded indexes
+ let end_offset = if loaded_indexes.count() == 0 {
+ 0
+ } else {
+ let last_index_offset = loaded_indexes.last().unwrap().offset() as
u64;
+ start_offset + last_index_offset
+ };
+
+ let (start_timestamp, end_timestamp) = if loaded_indexes.count() == 0 {
+ (0, 0)
+ } else {
+ (
+ loaded_indexes.get(0).unwrap().timestamp(),
+ loaded_indexes.last().unwrap().timestamp(),
+ )
+ };
+
+ // Create the new Segment with proper values from file system
+ let mut segment = Segment2::new(
+ start_offset,
+ config.segment.size,
+ config.segment.message_expiry,
+ );
+
+ // Set properties based on file data
+ segment.start_timestamp = start_timestamp;
+ segment.end_timestamp = end_timestamp;
+ segment.end_offset = end_offset;
+ segment.size = messages_size;
+ segment.sealed = true; // Persisted segments are assumed to be sealed
+
+ if config.partition.validate_checksum {
+ info!(
+ "Validating checksum for segment at offset {} in stream ID:
{}, topic ID: {}, partition ID: {}",
+ start_offset, stream_id, topic_id, partition_id
+ );
+ let messages_count = loaded_indexes.count() as u32;
+ if messages_count > 0 {
+ const BATCH_COUNT: u32 = 10000;
+ let mut current_relative_offset = 0u32;
+ let mut processed_count = 0u32;
+
+ while processed_count < messages_count {
+ let remaining_count = messages_count - processed_count;
+ let batch_count = std::cmp::min(BATCH_COUNT,
remaining_count);
+ let batch_indexes = loaded_indexes
+ .slice_by_offset(current_relative_offset, batch_count)
+ .unwrap();
+
+ let messages_reader =
storage.messages_reader.as_ref().unwrap();
+ match
messages_reader.load_messages_from_disk(batch_indexes).await {
+ Ok(messages_batch) => {
+ if let Err(e) =
messages_batch.validate_checksums() {
+ return
Err(IggyError::CannotReadPartitions).with_error_context(|_| {
+ format!(
+ "Failed to validate message checksum
for segment at offset {} in stream ID: {}, topic ID: {}, partition ID: {},
error: {}",
+ start_offset, stream_id, topic_id,
partition_id, e
+ )
+ });
+ }
+ processed_count += messages_batch.count();
+ current_relative_offset += batch_count;
+ }
+ Err(e) => {
+ return Err(e).with_error_context(|_| {
+ format!(
+ "Failed to load messages from disk for
checksum validation at offset {} in stream ID: {}, topic ID: {}, partition ID:
{}",
+ start_offset, stream_id, topic_id,
partition_id
+ )
+ });
+ }
+ }
+ }
+ info!(
+ "Checksum validation completed for segment at offset {}",
+ start_offset
+ );
+ }
+ }
+
+ // Add segment to log
+ log.add_persisted_segment(segment, storage);
+
+ // Increment stats for partition - this matches the behavior from
partition storage load method
+ stats.increment_segments_count(1);
+
+ // Increment size and message counts based on the loaded segment data
+ stats.increment_size_bytes(messages_size as u64);
+
+ // Calculate message count from segment data (end_offset -
start_offset + 1 if there are messages)
+ let messages_count = if end_offset > start_offset {
+ (end_offset - start_offset + 1) as u64
+ } else if messages_size > 0 {
+ // Fallback: estimate based on loaded indexes count if available
+ loaded_indexes.count() as u64
+ } else {
+ 0
+ };
+
+ if messages_count > 0 {
+ stats.increment_messages_count(messages_count);
+ }
+
+ // Now handle index caching based on configuration
+ let should_cache_indexes = match config.segment.cache_indexes {
+ CacheIndexesConfig::All => true,
+ CacheIndexesConfig::OpenSegment => false, // Will be handled after
all segments are loaded
+ CacheIndexesConfig::None => false,
+ };
+
+ // Set the loaded indexes if we should cache them
+ if should_cache_indexes {
+ let segment_index = log.segments().len() - 1;
+ log.set_segment_indexes(segment_index, loaded_indexes);
+ }
+ }
+
+ // Handle OpenSegment cache configuration: only the last segment should
keep its indexes
+ if matches!(
+ config.segment.cache_indexes,
+ CacheIndexesConfig::OpenSegment
+ ) && log.has_segments()
+ {
+ let segments_count = log.segments().len();
+ if segments_count > 0 {
+ // Use the IndexReader from the last segment's storage to load
indexes
+ let last_storage = log.storages().last().unwrap();
+ match last_storage.index_reader.as_ref() {
+ Some(index_reader) => {
+ if let Ok(loaded_indexes) =
index_reader.load_all_indexes_from_disk().await {
+ log.set_segment_indexes(segments_count - 1,
loaded_indexes);
+ }
+ }
+ None => {
+ warn!("Index reader not available for last segment in
OpenSegment mode");
+ }
+ }
+ }
+ }
+
+ Ok(log)
+}
+
+async fn load_partition(
+ config: &SystemConfig,
+ stream_id: usize,
+ topic_id: usize,
+ partition_state: crate::state::system::PartitionState,
+ parent_stats: Arc<TopicStats>,
+) -> Result<partition2::Partition, IggyError> {
+ use std::sync::atomic::AtomicU64;
+ let stats = Arc::new(PartitionStats::new(parent_stats));
+ let partition_id = partition_state.id as u32;
+
+ // Load segments from disk to determine should_increment_offset and
current offset
+ let partition_path = config.get_partition_path(stream_id, topic_id,
partition_id as usize);
+ let log_files = collect_log_files(&partition_path).await?;
+ let should_increment_offset = !log_files.is_empty()
+ && log_files
+ .first()
+ .map(|entry| {
+ let log_file_name = entry
+ .path
+ .file_stem()
+ .unwrap()
+ .to_string_lossy()
+ .to_string();
+
+ let start_offset = log_file_name.parse::<u64>().unwrap();
+
+ // Build file paths directly
+ let messages_file_path =
+ format!("{}/{}.{}", partition_path, start_offset,
LOG_EXTENSION);
+ std::fs::metadata(&messages_file_path).is_ok_and(|metadata|
metadata.len() > 0)
+ })
+ .unwrap_or_else(|| false);
+
+ info!(
+ "Loading partition with ID: {} for stream with ID: {} and topic with
ID: {}, for path: {} from disk...",
+ partition_id, stream_id, topic_id, partition_path
+ );
+ // Load consumer offsets
+ let message_deduplicator = create_message_deduplicator(config);
+ let consumer_offset_path =
+ config.get_consumer_offsets_path(stream_id, topic_id, partition_id as
usize);
+ let consumer_group_offsets_path =
+ config.get_consumer_group_offsets_path(stream_id, topic_id,
partition_id as usize);
+
+ let consumer_offset = Arc::new(
+ load_consumer_offsets(&consumer_offset_path, ConsumerKind::Consumer)?
+ .into_iter()
+ .map(|offset| (offset.consumer_id as usize, offset))
+ .collect::<HashMap<usize, ConsumerOffset>>()
+ .into(),
+ );
+
+ let consumer_group_offset = Arc::new(
+ load_consumer_offsets(&consumer_group_offsets_path,
ConsumerKind::ConsumerGroup)?
+ .into_iter()
+ .map(|offset| (offset.consumer_id as usize, offset))
+ .collect::<HashMap<usize, ConsumerOffset>>()
+ .into(),
+ );
+
+ let log = Default::default();
+ let partition = partition2::Partition::new(
+ partition_state.created_at,
+ should_increment_offset,
+ stats,
+ message_deduplicator,
+ Arc::new(Default::default()),
+ consumer_offset,
+ consumer_group_offset,
+ log,
+ );
+
+ info!(
+ "Loaded partition with ID: {} for stream with ID: {} and topic with
ID: {}",
+ partition_id, stream_id, topic_id
+ );
+
+ Ok(partition)
+}
diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
index dbc93ebe..402739be 100644
--- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
+++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
@@ -18,13 +18,11 @@
use crate::server_error::CompatError;
use crate::streaming::utils::file;
-use async_zip::tokio::write;
use compio::{
fs::File,
- io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
BufReader, BufWriter},
+ io::{AsyncBufRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
BufWriter},
};
use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use std::io::{Seek, SeekFrom};
pub struct IndexRebuilder {
pub messages_file_path: String,
diff --git a/core/server/src/configs/system.rs
b/core/server/src/configs/system.rs
index add75b31..fae172ca 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -31,7 +31,8 @@ use serde::{Deserialize, Serialize};
use serde_with::DisplayFromStr;
use serde_with::serde_as;
-const INDEX_EXTENSION: &str = "index";
+pub const INDEX_EXTENSION: &str = "index";
+pub const LOG_EXTENSION: &str = "log";
#[derive(Debug, Deserialize, Serialize)]
pub struct SystemConfig {
@@ -283,6 +284,17 @@ impl SystemConfig {
)
}
+ pub fn get_messages_file_path(
+ &self,
+ stream_id: usize,
+ topic_id: usize,
+ partition_id: usize,
+ start_offset: u64,
+ ) -> String {
+ let path = self.get_segment_path(stream_id, topic_id, partition_id,
start_offset);
+ format!("{path}.{LOG_EXTENSION}")
+ }
+
pub fn get_index_path(
&self,
stream_id: usize,
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 44917d0b..02ed206a 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -30,7 +30,7 @@ use error_set::ErrContext;
use figlet_rs::FIGfont;
use iggy_common::create_user::CreateUser;
use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
-use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
+use iggy_common::{Aes256GcmEncryptor, EncryptorKind, Identifier, IggyError};
use lending_iterator::lending_iterator::constructors::into_lending_iter;
use server::args::Args;
use server::binary::handlers::streams;
@@ -49,7 +49,8 @@ use server::log::tokio_console::Logging;
use server::server_error::{ConfigError, ServerError};
use server::shard::namespace::IggyNamespace;
use server::shard::system::info::SystemInfo;
-use server::shard::{IggyShard, ShardInfo};
+use server::shard::{IggyShard, ShardInfo, calculate_shard_assignment};
+use server::slab::traits_ext::{EntityComponentSystem,
EntityComponentSystemMutCell, IntoComponents};
use server::state::StateKind;
use server::state::command::EntryCommand;
use server::state::file::FileState;
@@ -189,7 +190,7 @@ async fn main() -> Result<(), ServerError> {
));
let state = SystemState::load(state).await?;
let (streams_state, users_state) = state.decompose();
- let streams = load_streams(streams_state.into_values(), &config.system)?;
+ let streams = load_streams(streams_state.into_values(),
&config.system).await?;
let users = load_users(users_state.into_values());
// ELEVENTH DISCRETE LOADING STEP.
@@ -221,11 +222,34 @@ async fn main() -> Result<(), ServerError> {
let (connections, shutdown_handles) =
create_shard_connections(&shards_set);
let mut handles = Vec::with_capacity(shards_set.len());
+ // TODO: Persist the shards table and load it from the disk, so it does
not have to be
// THIRTEENTH DISCRETE LOADING STEP.
// Shared resources bootstrap.
let shards_table = Box::new(DashMap::with_capacity(SHARDS_TABLE_CAPACITY));
let shards_table = Box::leak(shards_table);
let shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>> =
shards_table.into();
+ streams.with_components(|components| {
+ let (root, ..) = components.into_components();
+ for (_, stream) in root.iter() {
+ stream.topics().with_components(|components| {
+ let (root, ..) = components.into_components();
+ for (_, topic) in root.iter() {
+ topic.partitions().with_components(|components| {
+ let (root, ..) = components.into_components();
+ for (_, partition) in root.iter() {
+ let stream_id = stream.id();
+ let topic_id = topic.id();
+ let partition_id = partition.id();
+ let ns = IggyNamespace::new(stream_id, topic_id,
partition_id);
+ let shard_id = calculate_shard_assignment(&ns,
shards_set.len() as u32);
+ let shard_info = ShardInfo::new(shard_id);
+ shards_table.insert(ns, shard_info);
+ }
+ });
+ }
+ })
+ }
+ });
for shard_id in shards_set {
let id = shard_id as u16;
@@ -245,6 +269,28 @@ async fn main() -> Result<(), ServerError> {
encryptor.clone(),
));
+ // Ergh... I knew this will backfire to include `Log` as part of the
`Partition` entity,
+ // We have to initialize with an default log with every partition,
once we `Clone` the Streams / Topics / Partitions,
+ // because `Clone` impl for `Partition` does not clone the actual log,
just creates an empty one.
+ streams.with_components(|components| {
+ let (root, ..) = components.into_components();
+ for (_, stream) in root.iter() {
+ stream.topics().with_components_mut(|components| {
+ let (mut root, ..) = components.into_components();
+ for (_, topic) in root.iter_mut() {
+ let partitions_count = topic.partitions().len();
+ for log_id in 0..partitions_count {
+ let id =
topic.partitions_mut().insert_default_log();
+ assert_eq!(
+ id, log_id,
+ "main: partition_insert_default_log: id
mismatch when creating default log"
+ );
+ }
+ }
+ })
+ }
+ });
+
let handle = std::thread::Builder::new()
.name(format!("shard-{id}"))
.spawn(move || {
@@ -329,7 +375,6 @@ async fn main() -> Result<(), ServerError> {
let _command_handler = BackgroundServerCommandHandler::new(system.clone(),
&config)
.install_handler(SaveMessagesExecutor)
.install_handler(MaintainMessagesExecutor)
- .install_handler(ArchiveStateExecutor)
.install_handler(CleanPersonalAccessTokensExecutor)
.install_handler(SysInfoPrintExecutor)
.install_handler(VerifyHeartbeatsExecutor);
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 615a5f71..fc203b77 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -149,8 +149,7 @@ impl IggyShardBuilder {
id: id,
shards: shards,
shards_table,
- //streams2: streams, // TODO: Fixme
- streams2: Default::default(),
+ streams2: streams, // TODO: Fixme
users: RefCell::new(users),
storage: storage,
encryptor: encryptor,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index d4f57a89..e64cafb6 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -70,7 +70,10 @@ use crate::{
},
},
shard_error, shard_info, shard_warn,
- slab::{streams::Streams, traits_ext::EntityMarker},
+ slab::{
+ streams::Streams,
+ traits_ext::{EntityComponentSystem, EntityMarker, Insert},
+ },
state::{
StateKind,
file::FileState,
@@ -235,6 +238,7 @@ impl IggyShard {
}
pub async fn init(&self) -> Result<(), IggyError> {
+ self.load_segments().await?;
let _ = self.load_users().await;
Ok(())
}
@@ -244,7 +248,7 @@ impl IggyShard {
// loads streams and starts accepting connections. This is necessary to
// have the correct statistics when the server starts.
let now = Instant::now();
- //self.get_stats().await?;
+ self.get_stats().await?;
shard_info!(self.id, "Starting...");
self.init().await?;
// TODO: Fixme
@@ -294,6 +298,82 @@ impl IggyShard {
Ok(())
}
+ async fn load_segments(&self) -> Result<(), IggyError> {
+ use crate::bootstrap::load_segments;
+ use crate::shard::namespace::IggyNamespace;
+ for shard_entry in self.shards_table.iter() {
+ let (namespace, shard_info) = shard_entry.pair();
+
+ if shard_info.id == self.id {
+ let stream_id = namespace.stream_id();
+ let topic_id = namespace.topic_id();
+ let partition_id = namespace.partition_id();
+
+ shard_info!(
+ self.id,
+ "Loading segments for stream: {}, topic: {}, partition:
{}",
+ stream_id,
+ topic_id,
+ partition_id
+ );
+
+ let partition_path =
+ self.config
+ .system
+ .get_partition_path(stream_id, topic_id, partition_id);
+ let stats = self.streams2.with_partition_by_id(
+ &Identifier::numeric(stream_id as u32).unwrap(),
+ &Identifier::numeric(topic_id as u32).unwrap(),
+ partition_id,
+ |(_, stats, ..)| stats.clone(),
+ );
+ match load_segments(
+ &self.config.system,
+ stream_id,
+ topic_id,
+ partition_id,
+ partition_path,
+ stats,
+ )
+ .await
+ {
+ Ok(loaded_log) => {
+ self.streams2.with_partition_by_id_mut(
+ &Identifier::numeric(stream_id as u32).unwrap(),
+ &Identifier::numeric(topic_id as u32).unwrap(),
+ partition_id,
+ |(_,_,_ , offset, .., mut log)| {
+ *log = loaded_log;
+ let current_offset =
log.active_segment().end_offset;
+ offset.store(current_offset,
Ordering::Relaxed);
+ },
+ );
+ shard_info!(
+ self.id,
+ "Successfully loaded segments for stream: {},
topic: {}, partition: {}",
+ stream_id,
+ topic_id,
+ partition_id
+ );
+ }
+ Err(e) => {
+ shard_error!(
+ self.id,
+ "Failed to load segments for stream: {}, topic:
{}, partition: {}: {}",
+ stream_id,
+ topic_id,
+ partition_id,
+ e
+ );
+ return Err(e);
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
async fn load_users(&self) -> Result<(), IggyError> {
let users = self.users.borrow();
let users_count = users.len();
@@ -330,12 +410,6 @@ impl IggyShard {
self.shards.len() as u32
}
- pub fn calculate_shard_assignment(&self, ns: &IggyNamespace) -> u16 {
- let mut hasher = Murmur3Hasher::default();
- hasher.write_u64(ns.inner());
- (hasher.finish32() % self.get_available_shards_count()) as u16
- }
-
pub async fn handle_shard_message(&self, message: ShardMessage) ->
Option<ShardResponse> {
match message {
ShardMessage::Request(request) => match
self.handle_request(request).await {
@@ -955,3 +1029,9 @@ impl IggyShard {
}
}
}
+
+pub fn calculate_shard_assignment(ns: &IggyNamespace, upperbound: u32) -> u16 {
+ let mut hasher = Murmur3Hasher::default();
+ hasher.write_u64(ns.inner());
+ (hasher.finish32() % upperbound) as u16
+}
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 432eb0fd..05c0c74c 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -19,6 +19,7 @@
use super::COMPONENT;
use crate::shard::IggyShard;
use crate::shard::ShardInfo;
+use crate::shard::calculate_shard_assignment;
use crate::shard::namespace::IggyNamespace;
use crate::shard_info;
use crate::slab::traits_ext::EntityMarker;
@@ -112,10 +113,11 @@ impl IggyShard {
self.metrics.increment_partitions(partitions_count);
self.metrics.increment_segments(partitions_count);
+ let shards_count = self.get_available_shards_count();
for partition_id in partitions.iter().map(|p| p.id()) {
// TODO: Create shard table recordsj.
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
partition_id);
- let shard_id = self.calculate_shard_assignment(&ns);
+ let shard_id = calculate_shard_assignment(&ns, shards_count);
let shard_info = ShardInfo::new(shard_id);
let is_current_shard = self.id == shard_info.id;
self.insert_shard_table_record(ns, shard_info);
diff --git a/core/server/src/slab/partitions.rs
b/core/server/src/slab/partitions.rs
index 7c7f15b7..34ca117d 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -52,7 +52,7 @@ impl Clone for Partitions {
offset: self.offset.clone(),
consumer_offset: self.consumer_offset.clone(),
consumer_group_offset: self.consumer_group_offset.clone(),
- log: Slab::with_capacity(PARTITIONS_CAPACITY),
+ log: Slab::with_capacity(PARTITIONS_CAPACITY), // Empty log, we
don't clone the actual logs.
}
}
}
@@ -188,6 +188,10 @@ impl Partitions {
self.root.len()
}
+ pub fn insert_default_log(&mut self) -> ContainerId {
+ self.log.insert(Default::default())
+ }
+
pub fn with_partition_by_id<T>(
&self,
id: ContainerId,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 0343231f..549b8d82 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -4,16 +4,24 @@ use crate::{
shard::{namespace::IggyFullNamespace, system::messages::PollingArgs},
shard_info,
slab::{
- consumer_groups::ConsumerGroups, helpers, partitions::{self,
Partitions}, topics::Topics, traits_ext::{
+ Keyed,
+ consumer_groups::ConsumerGroups,
+ helpers,
+ partitions::{self, Partitions},
+ topics::Topics,
+ traits_ext::{
ComponentsById, DeleteCell, EntityComponentSystem,
EntityComponentSystemMutCell,
InsertCell, InteriorMutability, IntoComponents,
- }, Keyed
+ },
},
streaming::{
- partitions::{journal::Journal, partition2::{PartitionRef,
PartitionRefMut}},
+ partitions::{
+ journal::Journal,
+ partition2::{PartitionRef, PartitionRefMut},
+ },
polling_consumer::PollingConsumer,
segments::{
- storage::create_segment_storage, IggyMessagesBatchMut,
IggyMessagesBatchSet, Segment2
+ IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2,
storage::create_segment_storage,
},
stats::stats::StreamStats,
streams::{
@@ -166,14 +174,19 @@ impl MainOps for Streams {
streaming_partitions::helpers::calculate_current_offset(),
);
- let current_position = self.with_partition_by_id(stream_id, topic_id,
partition_id, |(.., log)| {
- log.journal().inner().size + log.active_segment().size
- });
+ let current_position =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.journal().inner().size + log.active_segment().size
+ });
self.with_partition_by_id_async(
stream_id,
topic_id,
partition_id,
-
streaming_partitions::helpers::deduplicate_messages(current_offset,
current_position, &mut input),
+ streaming_partitions::helpers::deduplicate_messages(
+ current_offset,
+ current_position,
+ &mut input,
+ ),
)
.await;
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 2c3ca43b..b5535d0e 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -30,17 +30,18 @@ use iggy_common::MaxTopicSize;
use iggy_common::create_user::CreateUser;
use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
use iggy_common::{IdKind, Identifier, Permissions, UserStatus};
+use std::collections::BTreeMap;
use std::fmt::Display;
use tracing::{debug, error, info};
#[derive(Debug, Clone)]
pub struct SystemState {
- pub streams: AHashMap<u32, StreamState>,
+ pub streams: BTreeMap<u32, StreamState>,
pub users: AHashMap<u32, UserState>,
}
impl SystemState {
- pub fn decompose(self) -> (AHashMap<u32, StreamState>, AHashMap<u32,
UserState>) {
+ pub fn decompose(self) -> (BTreeMap<u32, StreamState>, AHashMap<u32,
UserState>) {
(self.streams, self.users)
}
}
@@ -50,15 +51,15 @@ pub struct StreamState {
pub id: u32,
pub name: String,
pub created_at: IggyTimestamp,
- pub topics: AHashMap<u32, TopicState>,
+ pub topics: BTreeMap<u32, TopicState>,
}
#[derive(Debug, Clone)]
pub struct TopicState {
pub id: u32,
pub name: String,
- pub partitions: AHashMap<u32, PartitionState>,
- pub consumer_groups: AHashMap<u32, ConsumerGroupState>,
+ pub partitions: BTreeMap<u32, PartitionState>,
+ pub consumer_groups: BTreeMap<u32, ConsumerGroupState>,
pub compression_algorithm: CompressionAlgorithm,
pub message_expiry: IggyExpiry,
pub max_topic_size: MaxTopicSize,
@@ -161,7 +162,7 @@ impl SystemState {
}
pub async fn init(entries: Vec<StateEntry>) -> Result<Self, IggyError> {
- let mut streams = AHashMap::new();
+ let mut streams = BTreeMap::new();
let mut users = AHashMap::new();
for entry in entries {
debug!("Processing state entry: {entry}",);
@@ -177,7 +178,7 @@ impl SystemState {
let stream = StreamState {
id: stream_id,
name: command.name.clone(),
- topics: AHashMap::new(),
+ topics: BTreeMap::new(),
created_at: entry.timestamp,
};
streams.insert(stream.id, stream);
@@ -210,15 +211,15 @@ impl SystemState {
let topic = TopicState {
id: topic_id,
name: command.name,
- consumer_groups: AHashMap::new(),
+ consumer_groups: BTreeMap::new(),
compression_algorithm: command.compression_algorithm,
message_expiry: command.message_expiry,
max_topic_size: command.max_topic_size,
replication_factor: command.replication_factor,
created_at: entry.timestamp,
partitions: if command.partitions_count > 0 {
- let mut partitions = AHashMap::new();
- for i in 1..=command.partitions_count {
+ let mut partitions = BTreeMap::new();
+ for i in 0..command.partitions_count {
partitions.insert(
i,
PartitionState {
@@ -229,7 +230,7 @@ impl SystemState {
}
partitions
} else {
- AHashMap::new()
+ BTreeMap::new()
},
};
stream.topics.insert(topic.id, topic);
@@ -488,7 +489,7 @@ impl SystemState {
}
}
-fn find_stream_id(streams: &AHashMap<u32, StreamState>, stream_id:
&Identifier) -> u32 {
+fn find_stream_id(streams: &BTreeMap<u32, StreamState>, stream_id:
&Identifier) -> u32 {
match stream_id.kind {
IdKind::Numeric => stream_id
.get_u32_value()
@@ -506,7 +507,7 @@ fn find_stream_id(streams: &AHashMap<u32, StreamState>,
stream_id: &Identifier)
}
}
-fn find_topic_id(topics: &AHashMap<u32, TopicState>, topic_id: &Identifier) ->
u32 {
+fn find_topic_id(topics: &BTreeMap<u32, TopicState>, topic_id: &Identifier) ->
u32 {
match topic_id.kind {
IdKind::Numeric => topic_id
.get_u32_value()
@@ -525,7 +526,7 @@ fn find_topic_id(topics: &AHashMap<u32, TopicState>,
topic_id: &Identifier) -> u
}
fn find_consumer_group_id(
- groups: &AHashMap<u32, ConsumerGroupState>,
+ groups: &BTreeMap<u32, ConsumerGroupState>,
group_id: &Identifier,
) -> u32 {
match group_id.kind {
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index 8ce35655..c16020d8 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -472,9 +472,6 @@ pub fn get_segment_range_by_timestamp(
) -> impl FnOnce(ComponentsById<PartitionRef>) ->
Result<std::ops::Range<usize>, IggyError> {
move |(.., log)| -> Result<std::ops::Range<usize>, IggyError> {
let segments = log.segments();
- for seg in segments {
- tracing::warn!("timestamp: {}, segment: {:?}", timestamp, seg);
- }
let start = log
.segments()
.iter()
@@ -705,9 +702,6 @@ pub fn commit_journal() -> impl
FnOnce(ComponentsById<PartitionRefMut>) -> IggyM
let batches = log.journal_mut().commit();
log.ensure_indexes();
batches.append_indexes_to(log.active_indexes_mut().unwrap());
- let indexes = log.active_indexes_mut().unwrap();
- let first = indexes.get(0).unwrap();
- let last = indexes.last().unwrap();
batches
}
}
diff --git a/core/server/src/streaming/partitions/log.rs
b/core/server/src/streaming/partitions/log.rs
index 83712975..ebee527e 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -28,15 +28,6 @@ where
storage: Vec<Storage>,
}
-impl<J> Clone for SegmentedLog<J>
-where
- J: Journal + Default + Debug + Clone,
-{
- fn clone(&self) -> Self {
- Default::default()
- }
-}
-
impl<J> Default for SegmentedLog<J>
where
J: Journal + Debug + Default,
@@ -135,6 +126,12 @@ where
self.storage.push(storage);
self.indexes.push(None);
}
+
+ pub fn set_segment_indexes(&mut self, segment_index: usize, indexes:
IggyIndexesMut) {
+ if let Some(segment_indexes) = self.indexes.get_mut(segment_index) {
+ *segment_indexes = Some(indexes);
+ }
+ }
}
impl<J> SegmentedLog<J>
diff --git a/core/server/src/streaming/partitions/mod.rs
b/core/server/src/streaming/partitions/mod.rs
index b3863f4d..28d91e4f 100644
--- a/core/server/src/streaming/partitions/mod.rs
+++ b/core/server/src/streaming/partitions/mod.rs
@@ -23,7 +23,6 @@ pub mod log;
pub mod messages;
pub mod partition2;
pub mod segments;
-pub mod storage;
pub mod storage2;
pub const COMPONENT: &str = "STREAMING_PARTITIONS";
diff --git a/core/server/src/streaming/partitions/partition2.rs
b/core/server/src/streaming/partitions/partition2.rs
index a88bac02..92d8f738 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -131,7 +131,7 @@ impl Clone for Partition {
offset: Arc::clone(&self.offset),
consumer_offset: Arc::clone(&self.consumer_offset),
consumer_group_offset: Arc::clone(&self.consumer_group_offset),
- log: self.log.clone(),
+ log: Default::default(),
}
}
}
diff --git a/core/server/src/streaming/partitions/storage.rs
b/core/server/src/streaming/partitions/storage.rs
deleted file mode 100644
index 683b50a0..00000000
--- a/core/server/src/streaming/partitions/storage.rs
+++ /dev/null
@@ -1,266 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder;
-use crate::configs::cache_indexes::CacheIndexesConfig;
-use crate::io::fs_utils;
-use crate::state::system::PartitionState;
-use crate::streaming::partitions::COMPONENT;
-use crate::streaming::partitions::consumer_offset::ConsumerOffset;
-use crate::streaming::persistence::persister::PersisterKind;
-use crate::streaming::segments::*;
-use crate::streaming::utils::file;
-use compio::fs;
-use compio::fs::create_dir_all;
-use compio::io::AsyncReadExt;
-use error_set::ErrContext;
-use iggy_common::ConsumerKind;
-use iggy_common::IggyError;
-use std::path::Path;
-use std::sync::Arc;
-use std::sync::atomic::Ordering;
-use tracing::{error, info, trace, warn};
-
-#[derive(Debug)]
-pub struct FilePartitionStorage {
- persister: Arc<PersisterKind>,
-}
-
-impl FilePartitionStorage {
- pub fn new(persister: Arc<PersisterKind>) -> Self {
- Self { persister }
- }
-}
-/*
-
-impl PartitionStorage for FilePartitionStorage {
- async fn load(
- &self,
- partition: &mut Partition,
- state: PartitionState,
- ) -> Result<(), IggyError> {
- info!(
- "Loading partition with ID: {} for stream with ID: {} and topic
with ID: {}, for path: {} from disk...",
- partition.partition_id,
- partition.stream_id,
- partition.topic_id,
- partition.partition_path
- );
- partition.created_at = state.created_at;
- // TODO: Replace this with the dir walk impl, that is mentined
- // in the main function.
- let mut dir_entries = std::fs::read_dir(&partition.partition_path)
- .with_error_context(|error| format!(
- "{COMPONENT} (error: {error}) - failed to read partition
with ID: {} for stream with ID: {} and topic with ID: {} and path: {}.",
- partition.partition_id, partition.stream_id,
partition.topic_id, partition.partition_path,
- ))
- .map_err(|_| IggyError::CannotReadPartitions)?;
-
- let mut log_files = Vec::new();
- while let Some(dir_entry) = dir_entries.next() {
- let dir_entry = dir_entry.unwrap();
- let path = dir_entry.path();
- let extension = path.extension();
- if extension.is_none() || extension.unwrap() != LOG_EXTENSION {
- continue;
- }
- let metadata = dir_entry.metadata().unwrap();
- if metadata.is_dir() {
- continue;
- }
- log_files.push(dir_entry);
- }
-
- log_files.sort_by_key(|a| a.file_name());
-
- for dir_entry in log_files {
- let log_file_name = dir_entry
- .file_name()
- .into_string()
- .unwrap()
- .replace(&format!(".{LOG_EXTENSION}"), "");
-
- let start_offset = log_file_name.parse::<u64>().unwrap();
- let mut segment = Segment::create(
- partition.stream_id,
- partition.topic_id,
- partition.partition_id,
- start_offset,
- partition.config.clone(),
- partition.message_expiry,
- partition.size_of_parent_stream.clone(),
- partition.size_of_parent_topic.clone(),
- partition.size_bytes.clone(),
- partition.messages_count_of_parent_stream.clone(),
- partition.messages_count_of_parent_topic.clone(),
- partition.messages_count.clone(),
- false,
- );
-
- let index_path = segment.index_file_path().to_owned();
- let messages_file_path = segment.messages_file_path().to_owned();
- let time_index_path = index_path.replace(INDEX_EXTENSION,
"timeindex");
-
- // TODO: Move to fs_utils
- async fn try_exists(index_path: &str) -> Result<bool,
std::io::Error> {
- match compio::fs::metadata(index_path).await {
- Ok(_) => Ok(true),
- Err(err) => match err.kind() {
- std::io::ErrorKind::NotFound => Ok(false),
- _ => Err(err),
- },
- }
- }
- let index_path_exists = try_exists(&index_path).await.unwrap();
- let time_index_path_exists =
try_exists(&time_index_path).await.unwrap();
- let index_cache_enabled = matches!(
- partition.config.segment.cache_indexes,
- CacheIndexesConfig::All | CacheIndexesConfig::OpenSegment
- );
-
- // Rebuild indexes if index cache is enabled and index at path
does not exists.
- if index_cache_enabled && (!index_path_exists ||
time_index_path_exists) {
- warn!(
- "Index at path {} does not exist, rebuilding it based on
{}...",
- index_path, messages_file_path
- );
- let now = tokio::time::Instant::now();
- let index_rebuilder = IndexRebuilder::new(
- messages_file_path.clone(),
- index_path.clone(),
- start_offset,
- );
- index_rebuilder.rebuild().await.unwrap_or_else(|e| {
- panic!(
- "Failed to rebuild index for partition with ID: {} for
- stream with ID: {} and topic with ID: {}. Error: {e}",
- partition.partition_id, partition.stream_id,
partition.topic_id,
- )
- });
- info!(
- "Rebuilding index for path {} finished, it took {} ms",
- index_path,
- now.elapsed().as_millis()
- );
- }
-
- if time_index_path_exists {
- tokio::fs::remove_file(&time_index_path).await.unwrap();
- }
-
- segment.load_from_disk().await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to load
segment: {segment}",)
- })?;
-
- // If the first segment has at least a single message, we should
increment the offset.
- if !partition.should_increment_offset {
- partition.should_increment_offset =
segment.get_messages_size() > 0;
- }
-
- if partition.config.partition.validate_checksum {
- info!(
- "Validating messages checksum for partition with ID: {}
and segment with start offset: {}...",
- partition.partition_id,
- segment.start_offset()
- );
- segment.validate_messages_checksums().await?;
- info!(
- "Validated messages checksum for partition with ID: {} and
segment with start offset: {}.",
- partition.partition_id,
- segment.start_offset()
- );
- }
-
- // Load the unique message IDs for the partition if the
deduplication feature is enabled.
- let mut unique_message_ids_count = 0;
- if let Some(message_deduplicator) =
&partition.message_deduplicator {
- let max_entries =
partition.config.message_deduplication.max_entries as u32;
- info!(
- "Loading {max_entries} unique message IDs for partition
with ID: {} and segment with start offset: {}...",
- partition.partition_id,
- segment.start_offset()
- );
- let message_ids =
segment.load_message_ids(max_entries).await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to load
message ids, segment: {segment}",)
- })?;
- for message_id in message_ids {
- if message_deduplicator.try_insert(message_id).await {
- unique_message_ids_count += 1;
- } else {
- warn!(
- "Duplicated message ID: {} for partition with ID:
{} and segment with start offset: {}.",
- message_id,
- partition.partition_id,
- segment.start_offset()
- );
- }
- }
- info!(
- "Loaded: {} unique message IDs for partition with ID: {}
and segment with start offset: {}...",
- unique_message_ids_count,
- partition.partition_id,
- segment.start_offset()
- );
- }
-
- if CacheIndexesConfig::None ==
partition.config.segment.cache_indexes {
- segment.drop_indexes();
- }
-
- partition
- .segments_count_of_parent_stream
- .fetch_add(1, Ordering::SeqCst);
- partition.segments.push(segment);
- }
-
- if !partition.segments.is_empty() {
- let last_segment = partition.segments.last_mut().unwrap();
- partition.current_offset = last_segment.end_offset();
- }
-
- // If cache_indexes is OpenSegment, clear all segment indexes except
the last one
- if matches!(
- partition.config.segment.cache_indexes,
- CacheIndexesConfig::OpenSegment
- ) && !partition.segments.is_empty()
- {
- let segments_count = partition.segments.len();
- for i in 0..segments_count - 1 {
- partition.segments[i].drop_indexes();
- }
- }
-
- partition
- .load_consumer_offsets()
- .await
- .with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to load
consumer offsets, partition: {partition}",)
- })?;
- info!(
- "Loaded partition with ID: {} for stream with ID: {} and topic
with ID: {}, current offset: {}.",
- partition.partition_id,
- partition.stream_id,
- partition.topic_id,
- partition.current_offset
- );
-
- Ok(())
- }
-}
-
-*/
diff --git a/core/server/src/streaming/segments/mod.rs
b/core/server/src/streaming/segments/mod.rs
index 000fc972..6a2b28fc 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -18,21 +18,16 @@
mod indexes;
mod messages;
-mod reading_messages;
-mod segment;
mod segment2;
mod types;
pub mod storage;
pub use indexes::IggyIndexesMut;
-pub use segment::Segment;
pub use segment2::Segment2;
pub use types::IggyMessageHeaderViewMut;
pub use types::IggyMessageViewMut;
pub use types::IggyMessagesBatchMut;
pub use types::IggyMessagesBatchSet;
-pub const LOG_EXTENSION: &str = "log";
-pub const INDEX_EXTENSION: &str = "index";
pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024;
diff --git a/core/server/src/streaming/segments/reading_messages.rs
b/core/server/src/streaming/segments/reading_messages.rs
deleted file mode 100644
index 514cc353..00000000
--- a/core/server/src/streaming/segments/reading_messages.rs
+++ /dev/null
@@ -1,401 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet};
-use crate::streaming::segments::segment::Segment;
-use error_set::ErrContext;
-use iggy_common::{IggyByteSize, IggyError};
-use std::sync::atomic::Ordering;
-use tracing::{error, trace};
-
-const COMPONENT: &str = "STREAMING_SEGMENT";
-
-/*
-impl Segment {
- pub fn get_messages_size(&self) -> IggyByteSize {
- let on_disk_size = self.messages_size.load(Ordering::Relaxed);
- let accumulator_size = self.accumulator.size() as u64;
- IggyByteSize::from(on_disk_size + accumulator_size)
- }
-
- pub fn get_messages_count(&self) -> u32 {
- if self.get_messages_size() == 0 {
- return 0;
- }
-
- (self.end_offset - self.start_offset + 1) as u32
- }
-
- pub async fn get_messages_by_timestamp(
- &self,
- timestamp: u64,
- count: u32,
- ) -> Result<IggyMessagesBatchSet, IggyError> {
- if count == 0 {
- return Ok(IggyMessagesBatchSet::default());
- }
-
- trace!(
- "Getting {count} messages by timestamp {timestamp},
current_offset: {}...",
- self.end_offset
- );
-
- // Case 0: Accumulator is empty, so all messages have to be on disk
- if self.accumulator.is_empty() {
- return Ok(IggyMessagesBatchSet::from(
- self.load_messages_from_disk_by_timestamp(timestamp, count)
- .await?,
- ));
- }
-
- let accumulator_first_timestamp = self.accumulator.first_timestamp();
- let accumulator_last_timestamp = self.accumulator.last_timestamp();
-
- // Case 1: Requested timestamp is higher than any available timestamp
- if timestamp > accumulator_last_timestamp {
- return Ok(IggyMessagesBatchSet::empty());
- }
-
- // Case 2: Requested timestamp falls within accumulator range only
- if timestamp >= accumulator_first_timestamp {
- // Get all messages from accumulator with timestamp >= the
requested timestamp
- return Ok(self.accumulator.get_messages_by_timestamp(timestamp,
count));
- }
-
- // Case 3: Timestamp is lower than accumulator's first timestamp
- // Need to get messages from disk and potentially combine with
accumulator
- let messages_from_disk = IggyMessagesBatchSet::from(self
- .load_messages_from_disk_by_timestamp(timestamp, count)
- .await
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to load messages
from disk by timestamp, stream ID: {}, topic ID: {}, partition ID: {},
timestamp: {timestamp}",
- self.stream_id, self.topic_id, self.partition_id
- )
- })?);
-
- // If we got enough messages from disk or there are no messages from
disk,
- // we don't need to consider messages from the accumulator
- if messages_from_disk.count() >= count {
- return Ok(messages_from_disk);
- }
-
- // If we need more messages, get them from accumulator, respecting the
original timestamp
- // This ensures we don't miss messages with the same or very close
timestamps
- let remaining_count = count - messages_from_disk.count();
- let accumulator_messages = self
- .accumulator
- .get_messages_by_timestamp(timestamp, remaining_count);
-
- // Combine the messages
- let mut out = messages_from_disk;
- out.add_batch_set(accumulator_messages);
-
- Ok(out)
- }
-
- pub async fn get_messages_by_offset(
- &self,
- mut offset: u64,
- count: u32,
- ) -> Result<IggyMessagesBatchSet, IggyError> {
- if count == 0 {
- return Ok(IggyMessagesBatchSet::default());
- }
-
- if offset < self.start_offset {
- offset = self.start_offset;
- }
-
- let mut end_offset = offset + (count - 1) as u64;
- if end_offset > self.end_offset {
- end_offset = self.end_offset;
- }
-
- trace!(
- "Getting messages by offset: {}, count: {}, segment start_offset:
{}, segment end_offset: {}",
- offset, count, self.start_offset, self.end_offset
- );
-
- // Case 0: Accumulator is empty, so all messages have to be on disk
- if self.accumulator.is_empty() {
- return self.load_messages_from_disk_by_offset(offset, count).await;
- }
-
- let accumulator_first_msg_offset = self.accumulator.first_offset();
- let accumulator_last_msg_offset = self.accumulator.last_offset();
-
- // Case 1: All messages are in accumulator buffer
- if offset >= accumulator_first_msg_offset && end_offset <=
accumulator_last_msg_offset {
- return Ok(self.accumulator.get_messages_by_offset(offset, count));
- }
-
- // Case 2: All messages are on disk
- if end_offset < accumulator_first_msg_offset {
- return self.load_messages_from_disk_by_offset(offset, count).await;
- }
-
- // Case 3: Messages span disk and accumulator buffer boundary
- // Calculate how many messages we need from disk
- let disk_count = if offset < accumulator_first_msg_offset {
- ((accumulator_first_msg_offset - offset) as u32).min(count)
- } else {
- 0
- };
-
- let mut combined_batch_set = IggyMessagesBatchSet::empty();
-
- // Load messages from disk if needed
- if disk_count > 0 {
- let disk_messages = self
- .load_messages_from_disk_by_offset(offset, disk_count)
- .await
- .with_error_context(|error| {
- format!(
- "STREAMING_SEGMENT (error: {error}) - failed to load
messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start
offset: {offset}, count: {disk_count}",
- self.stream_id, self.topic_id, self.partition_id
- )
- })?;
-
- if !disk_messages.is_empty() {
- combined_batch_set.add_batch_set(disk_messages);
- }
- }
-
- // Calculate how many more messages we need from the accumulator
- let remaining_count = count - combined_batch_set.count();
-
- if remaining_count > 0 {
- let accumulator_start_offset = std::cmp::max(offset,
accumulator_first_msg_offset);
-
- let accumulator_messages = self
- .accumulator
- .get_messages_by_offset(accumulator_start_offset,
remaining_count);
-
- if !accumulator_messages.is_empty() {
- combined_batch_set.add_batch_set(accumulator_messages);
- }
- }
-
- Ok(combined_batch_set)
- }
-
- /// Loads and returns `count` newest message IDs from the log file.
- pub async fn load_message_ids(&self, count: u32) -> Result<Vec<u128>,
IggyError> {
- let messages_count = self.get_messages_count();
- trace!(
- "Loading message IDs for {messages_count} messages from log file:
{}",
- self.messages_path
- );
-
- if count == 0 || messages_count == 0 {
- return Ok(vec![]);
- }
-
- let adjusted_count = std::cmp::min(count, messages_count);
- let relative_start_offset = messages_count - adjusted_count;
-
- let indexes = self
- .load_indexes_by_offset(relative_start_offset, adjusted_count)
- .await?;
-
- if indexes.is_none() {
- return Ok(vec![]);
- }
-
- let indexes = indexes.unwrap();
-
- let ids = self
- .messages_reader
- .as_ref()
- .unwrap()
- .load_all_message_ids_from_disk(indexes, messages_count)
- .await
- .with_error_context(|error| {
- format!("Failed to load message IDs, error: {error} for
{self}")
- })?;
-
- trace!(
- "Loaded {} message IDs from log file: {}",
- ids.len(),
- self.messages_path
- );
- Ok(ids)
- }
-
- pub async fn validate_messages_checksums(&self) -> Result<(), IggyError> {
- let messages_count = self.get_messages_count();
- if messages_count == 0 {
- return Ok(());
- }
-
- const BATCH_COUNT: u32 = 10000;
- let end_offset = self.end_offset;
- let mut current_offset = self.start_offset;
- let mut processed_count = 0;
-
- while current_offset <= end_offset {
- let remaining_count = messages_count - processed_count;
- let batch_count = std::cmp::min(BATCH_COUNT, remaining_count);
-
- let messages_batch = self
- .get_messages_by_offset(current_offset, batch_count)
- .await?;
-
- for batch in messages_batch.iter() {
- batch.validate_checksums().with_error_context(|error| {
- format!("Failed to validate message checksum, error:
{error} for {self}")
- })?;
- processed_count += batch.count();
- }
- current_offset += batch_count as u64;
- }
-
- Ok(())
- }
-
- async fn load_indexes_by_offset(
- &self,
- relative_start_offset: u32,
- count: u32,
- ) -> Result<Option<IggyIndexesMut>, IggyError> {
- let indexes = if let Some(ref indexes) = self.indexes {
- if !indexes.is_empty() {
- indexes.slice_by_offset(relative_start_offset, count)
- } else {
- self.index_reader
- .as_ref()
- .expect("Index reader not initialized")
- .load_from_disk_by_offset(relative_start_offset, count)
- .await?
- }
- } else {
- self.index_reader
- .as_ref()
- .expect("Index reader not initialized")
- .load_from_disk_by_offset(relative_start_offset, count)
- .await?
- };
- Ok(indexes)
- }
-
- async fn load_indexes_by_timestamp(
- &self,
- timestamp: u64,
- count: u32,
- ) -> Result<Option<IggyIndexesMut>, IggyError> {
- let indexes = if let Some(ref indexes) = self.indexes {
- if !indexes.is_empty() {
- indexes.slice_by_timestamp(timestamp, count)
- } else {
- self.index_reader
- .as_ref()
- .unwrap()
- .load_from_disk_by_timestamp(timestamp, count)
- .await?
- }
- } else {
- self.index_reader
- .as_ref()
- .unwrap()
- .load_from_disk_by_timestamp(timestamp, count)
- .await?
- };
- Ok(indexes)
- }
-
- async fn load_messages_from_disk_by_offset(
- &self,
- start_offset: u64,
- count: u32,
- ) -> Result<IggyMessagesBatchSet, IggyError> {
- tracing::trace!(
- "Loading {count} messages from disk, start_offset: {start_offset},
segment start_offset: {}, segment end_offset: {}...",
- self.start_offset,
- self.end_offset
- );
- let relative_start_offset = (start_offset - self.start_offset) as u32;
-
- let indexes_to_read = self
- .load_indexes_by_offset(relative_start_offset, count)
- .await?;
-
- if indexes_to_read.is_none() {
- return Ok(IggyMessagesBatchSet::empty());
- }
- let indexes_to_read = indexes_to_read.unwrap();
-
- let batch = self
- .messages_reader
- .as_ref()
- .expect("Messages reader not initialized")
- .load_messages_from_disk(indexes_to_read)
- .await
- .with_error_context(|error| {
- format!("Failed to load messages from segment file: {self}.
{error}")
- })?;
-
- batch
- .validate_checksums_and_offsets(start_offset)
- .with_error_context(|error| {
- format!(
- "Failed to validate messages read from disk! error:
{error}, file: {}",
- self.messages_path
- )
- })?;
-
- tracing::trace!(
- "Loaded {} messages ({} bytes) from disk (requested {count}
messages), start_offset: {start_offset}, end_offset: {}",
- batch.count(),
- batch.size(),
- self.end_offset
- );
-
- Ok(IggyMessagesBatchSet::from(batch))
- }
-
- async fn load_messages_from_disk_by_timestamp(
- &self,
- timestamp: u64,
- count: u32,
- ) -> Result<IggyMessagesBatchMut, IggyError> {
- tracing::trace!(
- "Loading {count} messages from disk, timestamp: {timestamp},
current_timestamp: {}...",
- self.end_timestamp
- );
-
- let indexes_to_read = self.load_indexes_by_timestamp(timestamp,
count).await?;
-
- if indexes_to_read.is_none() {
- return Ok(IggyMessagesBatchMut::empty());
- }
-
- let indexes_to_read = indexes_to_read.unwrap();
-
- self.messages_reader
- .as_ref()
- .expect("Messages reader not initialized")
- .load_messages_from_disk(indexes_to_read)
- .await
- .with_error_context(|error| {
- format!("Failed to load messages from segment file by
timestamp: {self}. {error}")
- })
- }
-}
-
-*/
diff --git a/core/server/src/streaming/segments/segment.rs
b/core/server/src/streaming/segments/segment.rs
deleted file mode 100644
index 1c0311b3..00000000
--- a/core/server/src/streaming/segments/segment.rs
+++ /dev/null
@@ -1,529 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::indexes::*;
-use super::messages::*;
-use crate::configs::system::SystemConfig;
-use crate::streaming::segments::*;
-use compio::fs::remove_file;
-use error_set::ErrContext;
-use iggy_common::INDEX_SIZE;
-use iggy_common::IggyByteSize;
-use iggy_common::IggyError;
-use iggy_common::IggyExpiry;
-use iggy_common::IggyTimestamp;
-use std::rc::Rc;
-use std::sync::Arc;
-use std::sync::atomic::{AtomicU64, Ordering};
-use tracing::error;
-use tracing::{info, warn};
-
-const SIZE_16MB: usize = 16 * 1024 * 1024;
-
-#[derive(Debug)]
-pub struct Segment {
- pub(super) stream_id: u32,
- pub(super) topic_id: u32,
- pub(super) partition_id: u32,
- pub(super) start_offset: u64,
- pub(super) start_timestamp: u64, // first message timestamp
- pub(super) end_timestamp: u64, // last message timestamp
- pub(super) end_offset: u64,
- pub(super) index_path: String,
- pub(super) messages_path: String,
- pub(super) last_index_position: u32,
- pub(super) max_size_bytes: IggyByteSize,
- pub(super) size_of_parent_stream: Arc<AtomicU64>,
- pub(super) size_of_parent_topic: Arc<AtomicU64>,
- pub(super) size_of_parent_partition: Arc<AtomicU64>,
- pub(super) messages_count_of_parent_stream: Arc<AtomicU64>,
- pub(super) messages_count_of_parent_topic: Arc<AtomicU64>,
- pub(super) messages_count_of_parent_partition: Arc<AtomicU64>,
- pub(super) is_closed: bool,
- pub(super) messages_writer: Option<MessagesWriter>,
- pub(super) messages_reader: Option<MessagesReader>,
- pub(super) index_writer: Option<IndexWriter>,
- pub(super) index_reader: Option<IndexReader>,
- pub(super) message_expiry: IggyExpiry,
- pub(super) config: Arc<SystemConfig>,
- pub(super) indexes: Option<IggyIndexesMut>,
- pub(super) messages_size: Arc<AtomicU64>,
- pub(super) indexes_size: Arc<AtomicU64>,
-}
-/*
-
-impl Segment {
- #[allow(clippy::too_many_arguments)]
- pub fn create(
- stream_id: u32,
- topic_id: u32,
- partition_id: u32,
- start_offset: u64,
- config: Arc<SystemConfig>,
- message_expiry: IggyExpiry,
- size_of_parent_stream: Arc<AtomicU64>,
- size_of_parent_topic: Arc<AtomicU64>,
- size_of_parent_partition: Arc<AtomicU64>,
- messages_count_of_parent_stream: Arc<AtomicU64>,
- messages_count_of_parent_topic: Arc<AtomicU64>,
- messages_count_of_parent_partition: Arc<AtomicU64>,
- fresh: bool, // `fresh` means created and persisted in this runtime,
in other words it's set to false when loading from disk
- ) -> Segment {
- let path = config.get_segment_path(
- stream_id as usize,
- topic_id as usize,
- partition_id as usize,
- start_offset,
- );
- let messages_path = Self::get_messages_file_path(&path);
- let index_path = Self::get_index_path(&path);
- let message_expiry = match message_expiry {
- IggyExpiry::ServerDefault => config.segment.message_expiry,
- _ => message_expiry,
- };
- Segment {
- stream_id,
- topic_id,
- partition_id,
- start_offset,
- start_timestamp: IggyTimestamp::now().as_micros(),
- end_timestamp: IggyTimestamp::now().as_micros(),
- end_offset: start_offset,
- messages_path,
- index_path,
- last_index_position: 0,
- max_size_bytes: config.segment.size,
- message_expiry,
- indexes: None,
- is_closed: false,
- messages_writer: None,
- messages_reader: None,
- index_writer: None,
- index_reader: None,
- size_of_parent_stream,
- size_of_parent_partition,
- size_of_parent_topic,
- messages_count_of_parent_stream,
- messages_count_of_parent_topic,
- messages_count_of_parent_partition,
- config,
- messages_size: Arc::new(AtomicU64::new(0)),
- indexes_size: Arc::new(AtomicU64::new(0)),
- }
- }
-
- /// Load the segment state from disk.
- pub async fn load_from_disk(&mut self) -> Result<(), IggyError> {
- if self.messages_reader.is_none() || self.index_reader.is_none() {
- self.initialize_writing(true).await?;
- self.initialize_reading().await?;
- }
-
- let log_size_bytes = self.messages_size.load(Ordering::Acquire);
- info!(
- "Loading segment from disk: messages_file_path: {}, index_path:
{}, log_size: {}",
- self.messages_path,
- self.index_path,
- IggyByteSize::from(log_size_bytes)
- );
-
- self.last_index_position = log_size_bytes as _;
-
- let loaded_indexes = self
- .index_reader
- .as_ref()
- .unwrap()
- .load_all_indexes_from_disk()
- .await
- .with_error_context(|error| format!("Failed to load indexes for
{self}. {error}"))
- .map_err(|_| IggyError::CannotReadFile)?;
-
- info!(
- "Loaded {} indexes for segment with start offset: {}, end offset:
{}, and partition with ID: {}, topic with ID: {}, and stream with ID: {}.",
- self.indexes.as_ref().map_or(0, |idx| idx.count()),
- self.start_offset,
- self.end_offset,
- self.partition_id,
- self.topic_id,
- self.stream_id
- );
-
- let last_index_offset = if loaded_indexes.is_empty() {
- 0_u64
- } else {
- loaded_indexes.last().unwrap().offset() as u64
- };
-
- self.end_offset = self.start_offset + last_index_offset;
-
- info!(
- "Loaded {} indexes for segment with start offset: {}, end offset:
{}, and partition with ID: {}, topic with ID: {}, and stream with ID: {}.",
- self.indexes.as_ref().map_or(0, |idx| idx.count()),
- self.start_offset,
- self.end_offset,
- self.partition_id,
- self.topic_id,
- self.stream_id
- );
-
- if self.is_full().await {
- self.is_closed = true;
- }
-
- let messages_count = self.get_messages_count() as u64;
-
- info!(
- "Loaded segment with log file of size {} ({} messages) for start
offset {}, end offset: {}, and partition with ID: {} for topic with ID: {} and
stream with ID: {}.",
- IggyByteSize::from(log_size_bytes),
- messages_count,
- self.start_offset,
- self.end_offset,
- self.partition_id,
- self.topic_id,
- self.stream_id
- );
-
- self.size_of_parent_stream
- .fetch_add(log_size_bytes, Ordering::SeqCst);
- self.size_of_parent_topic
- .fetch_add(log_size_bytes, Ordering::SeqCst);
- self.size_of_parent_partition
- .fetch_add(log_size_bytes, Ordering::SeqCst);
- self.messages_count_of_parent_stream
- .fetch_add(messages_count, Ordering::SeqCst);
- self.messages_count_of_parent_topic
- .fetch_add(messages_count, Ordering::SeqCst);
- self.messages_count_of_parent_partition
- .fetch_add(messages_count, Ordering::SeqCst);
-
- Ok(())
- }
-
- /// Open segment.
- pub async fn open(&mut self) -> Result<(), IggyError> {
- info!(
- "Saving segment with start offset: {} for partition with ID: {}
for topic with ID: {} and stream with ID: {}",
- self.start_offset, self.partition_id, self.topic_id, self.stream_id
- );
- self.initialize_writing(false).await?;
- self.initialize_reading().await?;
- info!(
- "Saved segment log file with start offset: {} for partition with
ID: {} for topic with ID: {} and stream with ID: {}",
- self.start_offset, self.partition_id, self.topic_id, self.stream_id
- );
- Ok(())
- }
-
- pub async fn initialize_writing(&mut self, file_exists: bool) ->
Result<(), IggyError> {
- let log_fsync = self.config.partition.enforce_fsync;
- let index_fsync = self.config.partition.enforce_fsync;
-
- let messages_writer = MessagesWriter::new(
- &self.messages_path,
- self.messages_size.clone(),
- log_fsync,
- file_exists,
- )
- .await?;
-
- let index_writer = IndexWriter::new(
- &self.index_path,
- self.indexes_size.clone(),
- index_fsync,
- file_exists,
- )
- .await?;
-
- self.messages_writer = Some(messages_writer);
- self.index_writer = Some(index_writer);
- Ok(())
- }
-
- pub async fn initialize_reading(&mut self) -> Result<(), IggyError> {
- let messages_reader =
- MessagesReader::new(&self.messages_path,
self.messages_size.clone()).await?;
- self.messages_reader = Some(messages_reader);
-
- let index_reader = IndexReader::new(&self.index_path,
self.indexes_size.clone()).await?;
- self.index_reader = Some(index_reader);
-
- Ok(())
- }
-
- pub async fn is_full(&self) -> bool {
- if self.get_messages_size() >= self.max_size_bytes {
- return true;
- }
-
- self.is_expired(IggyTimestamp::now()).await
- }
-
- pub async fn is_expired(&self, now: IggyTimestamp) -> bool {
- if !self.is_closed {
- return false;
- }
-
- match self.message_expiry {
- IggyExpiry::NeverExpire => false,
- IggyExpiry::ServerDefault => false,
- IggyExpiry::ExpireDuration(expiry) => {
- let last_messages =
self.get_messages_by_offset(self.end_offset, 1).await;
- if last_messages.is_err() {
- return false;
- }
-
- let last_messages = last_messages.unwrap();
- if last_messages.is_empty() {
- return false;
- }
-
- let last_message =
last_messages.iter().last().unwrap().iter().last().unwrap();
- let last_message_timestamp = last_message.header().timestamp();
- last_message_timestamp + expiry.as_micros() <= now.as_micros()
- }
- }
- }
-
- pub async fn shutdown_reading(&mut self) {
- if let Some(log_reader) = self.messages_reader.take() {
- drop(log_reader);
- }
- if let Some(index_reader) = self.index_reader.take() {
- drop(index_reader);
- }
- }
-
- pub async fn delete(&mut self) -> Result<(), IggyError> {
- let segment_size = self.get_messages_size();
- let segment_count_of_messages = self.get_messages_count() as u64;
- info!(
- "Deleting segment of size {segment_size}
({segment_count_of_messages} messages) with start offset: {} for partition with
ID: {} for stream with ID: {} and topic with ID: {}...",
- self.start_offset, self.partition_id, self.stream_id,
self.topic_id,
- );
-
- self.shutdown_reading().await;
-
- if !self.is_closed {
- self.shutdown_writing().await;
- }
-
- let _ = remove_file(&self.messages_path)
- .await
- .with_error_context(|error| {
- format!("Failed to delete log file: {}. {error}",
self.messages_path)
- });
- let _ = remove_file(&self.index_path)
- .await
- .with_error_context(|error| {
- format!("Failed to delete index file: {}. {error}",
self.index_path)
- });
-
- let segment_size_bytes = segment_size.as_bytes_u64();
- self.size_of_parent_stream
- .fetch_sub(segment_size_bytes, Ordering::SeqCst);
- self.size_of_parent_topic
- .fetch_sub(segment_size_bytes, Ordering::SeqCst);
- self.size_of_parent_partition
- .fetch_sub(segment_size_bytes, Ordering::SeqCst);
- self.messages_count_of_parent_stream
- .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
- self.messages_count_of_parent_topic
- .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
- self.messages_count_of_parent_partition
- .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
-
- info!(
- "Deleted segment of size {segment_size} with start offset: {} for
partition with ID: {} for stream with ID: {} and topic with ID: {}.",
- self.start_offset, self.partition_id, self.stream_id,
self.topic_id,
- );
-
- Ok(())
- }
-
- fn get_messages_file_path(path: &str) -> String {
- format!("{path}.{LOG_EXTENSION}")
- }
-
- fn get_index_path(path: &str) -> String {
- format!("{path}.{INDEX_EXTENSION}")
- }
-
- pub fn update_message_expiry(&mut self, message_expiry: IggyExpiry) {
- self.message_expiry = message_expiry;
- }
-
- /// Ensure indexes are initialized with proper capacity
- pub fn ensure_indexes(&mut self) {
- if self.indexes.is_none() {
- let capacity = SIZE_16MB / INDEX_SIZE;
- self.indexes = Some(IggyIndexesMut::with_capacity(capacity, 0));
- }
- }
-
- pub fn is_closed(&self) -> bool {
- self.is_closed
- }
-
- pub fn start_offset(&self) -> u64 {
- self.start_offset
- }
-
- pub fn end_offset(&self) -> u64 {
- self.end_offset
- }
-
- pub fn end_timestamp(&self) -> u64 {
- self.end_timestamp
- }
-
- pub fn index_file_path(&self) -> &str {
- &self.index_path
- }
-
- pub fn partition_id(&self) -> u32 {
- self.partition_id
- }
-
- pub fn messages_file_path(&self) -> &str {
- &self.messages_path
- }
-
- /// Explicitly drop the old indexes to ensure memory is freed
- pub fn drop_indexes(&mut self) {
- self.indexes = None;
- }
-}
-
-impl std::fmt::Display for Segment {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "Segment {{ stream_id: {}, topic_id: {}, partition_id: {},
start_offset: {}, end_offset: {}, size_bytes: {}, last_index_position: {},
max_size_bytes: {}, closed: {} }}",
- self.stream_id,
- self.topic_id,
- self.partition_id,
- self.start_offset,
- self.end_offset,
- self.get_messages_size(),
- self.last_index_position,
- self.max_size_bytes,
- self.is_closed
- )
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::configs::cache_indexes::CacheIndexesConfig;
- use crate::configs::system::SegmentConfig;
- use crate::streaming::utils::MemoryPool;
- use iggy_common::IggyDuration;
-
- #[tokio::test]
- async fn should_be_created_given_valid_parameters() {
- let stream_id = 1usize;
- let topic_id = 2usize;
- let partition_id = 3usize;
- let start_offset = 0;
- let config = Arc::new(SystemConfig::default());
- let path = config.get_segment_path(stream_id, topic_id, partition_id,
start_offset);
- let messages_file_path = Segment::get_messages_file_path(&path);
- let index_path = Segment::get_index_path(&path);
- let message_expiry =
IggyExpiry::ExpireDuration(IggyDuration::from(10));
- let size_of_parent_stream = Arc::new(AtomicU64::new(0));
- let size_of_parent_topic = Arc::new(AtomicU64::new(0));
- let size_of_parent_partition = Arc::new(AtomicU64::new(0));
- let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
- let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
- let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
- MemoryPool::init_pool(config.clone());
-
- let segment = Segment::create(
- stream_id as u32,
- topic_id as u32,
- partition_id as u32,
- start_offset,
- config,
- message_expiry,
- size_of_parent_stream,
- size_of_parent_topic,
- size_of_parent_partition,
- messages_count_of_parent_stream,
- messages_count_of_parent_topic,
- messages_count_of_parent_partition,
- true,
- );
-
- assert_eq!(segment.stream_id, stream_id as u32);
- assert_eq!(segment.topic_id, topic_id as u32);
- assert_eq!(segment.partition_id, partition_id as u32);
- assert_eq!(segment.start_offset(), start_offset);
- assert_eq!(segment.end_offset(), 0);
- assert_eq!(segment.get_messages_size(), 0);
- assert_eq!(segment.messages_file_path(), messages_file_path);
- assert_eq!(segment.index_file_path(), index_path);
- assert_eq!(segment.message_expiry, message_expiry);
- assert!(segment.indexes.is_none());
- assert!(!segment.is_closed());
- assert!(!segment.is_full().await);
- }
-
- #[tokio::test]
- async fn should_not_initialize_indexes_cache_when_disabled() {
- let stream_id = 1;
- let topic_id = 2;
- let partition_id = 3;
- let start_offset = 0;
- let config = Arc::new(SystemConfig {
- segment: SegmentConfig {
- cache_indexes: CacheIndexesConfig::None,
- ..Default::default()
- },
- ..Default::default()
- });
- let message_expiry = IggyExpiry::NeverExpire;
- let size_of_parent_stream = Arc::new(AtomicU64::new(0));
- let size_of_parent_topic = Arc::new(AtomicU64::new(0));
- let size_of_parent_partition = Arc::new(AtomicU64::new(0));
- let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
- let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
- let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
- MemoryPool::init_pool(config.clone());
-
- let segment = Segment::create(
- stream_id,
- topic_id,
- partition_id,
- start_offset,
- config,
- message_expiry,
- size_of_parent_stream,
- size_of_parent_topic,
- size_of_parent_partition,
- messages_count_of_parent_stream,
- messages_count_of_parent_topic,
- messages_count_of_parent_partition,
- true,
- );
-
- assert!(segment.indexes.is_none());
- }
-}
-
-*/
diff --git a/core/server/src/streaming/segments/storage.rs
b/core/server/src/streaming/segments/storage.rs
index 737b8947..249b67eb 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -36,9 +36,13 @@ impl Storage {
let index_writer =
IndexWriter::new(index_path, indexes_size.clone(), index_fsync,
file_exists).await?;
+ if file_exists {
+ messages_writer.fsync().await?;
+ index_writer.fsync().await?;
+ }
+
let messages_reader = MessagesReader::new(messages_path, size).await?;
let index_reader = IndexReader::new(index_path, indexes_size).await?;
-
Ok(Self {
messages_writer: Some(messages_writer),
messages_reader: Some(messages_reader),
@@ -64,7 +68,8 @@ pub async fn create_segment_storage(
indexes_size: u64,
start_offset: u64,
) -> Result<Storage, IggyError> {
- let messages_path = config.get_segment_path(stream_id, topic_id,
partition_id, start_offset);
+ let messages_path =
+ config.get_messages_file_path(stream_id, topic_id, partition_id,
start_offset);
let index_path = config.get_index_path(stream_id, topic_id, partition_id,
start_offset);
let log_fsync = config.partition.enforce_fsync;
let index_fsync = config.partition.enforce_fsync;