hubcio commented on code in PR #3163: URL: https://github.com/apache/iggy/pull/3163#discussion_r3194622918
########## core/server-ng/src/bootstrap.rs: ########## @@ -0,0 +1,1474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config_writer::write_current_config; +use crate::server_error::ServerNgError; +use configs::server_ng::ServerNgConfig; +use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; +use iggy_binary_protocol::RequestHeader; +use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use iggy_common::{ + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, TopicStats, + sharding::LocalIdx, variadic, +}; +use journal::Journal; +use journal::prepare_journal::PrepareJournal; +use message_bus::client_listener::{self, RequestHandler}; +use message_bus::fd_transfer; +use message_bus::installer; +use message_bus::installer::ConnectionInstaller; +use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind}; +use message_bus::replica::io as replica_io; +use message_bus::replica::listener::{self as replica_listener, MessageHandler}; +use message_bus::transports::quic::server_config_with_cert; +use message_bus::transports::tls::{ + TlsServerCredentials, install_default_crypto_provider, load_pem, self_signed_for_loopback, +}; +use message_bus::{ + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedTlsClientFn, + AcceptedWsClientFn, IggyMessageBus, connector, +}; +use metadata::IggyMetadata; +use metadata::MuxStateMachine; +use metadata::impls::metadata::{IggySnapshot, StreamsFrontend}; +use metadata::impls::recovery::recover; +use metadata::stm::consumer_group::ConsumerGroups; +use metadata::stm::snapshot::Snapshot; +use metadata::stm::stream::{Partition, Streams}; +use metadata::stm::user::Users; +use partitions::{ + IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, +}; +// TODO: decouple bootstrap/storage helpers and logging from the `server` crate. +use server::bootstrap::create_directories; +use server::log::logger::Logging; +use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; +use server::streaming::segments::storage::create_segment_storage; +use shard::builder::IggyShardBuilder; +use shard::shards_table::PapayaShardsTable; +use shard::{ + CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, channel, shard_channel, +}; +use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::net::{IpAddr, SocketAddr}; +use std::path::{Path, PathBuf}; +use std::rc::{Rc, Weak}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tracing::{info, warn}; + +const CLUSTER_ID: u128 = 1; +const SHARD_ID: u16 = 0; +const SHARD_REPLICA_ID: u8 = 0; +const SHARD_NAME: &str = "server-ng-shard-0"; +const SHARD_INBOX_CAPACITY: usize = 1024; + +type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; +type ServerNgMetadata = IggyMetadata< + VsrConsensus<Rc<IggyMessageBus>>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, +>; +type ServerNgShard = IggyShard< + Rc<IggyMessageBus>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, + PapayaShardsTable, +>; + +type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>; + +struct TcpTopology { + self_replica_id: u8, + replica_count: u8, + client_listen_addr: SocketAddr, + replica_listen_addr: Option<SocketAddr>, + ws_listen_addr: Option<SocketAddr>, + quic_listen_addr: Option<SocketAddr>, + tcp_tls_listen_addr: Option<SocketAddr>, + peers: Vec<(u8, SocketAddr)>, +} + +struct LocalClientAcceptFns { + tcp: AcceptedClientFn, + ws: AcceptedWsClientFn, + quic: AcceptedQuicClientFn, + tcp_tls: AcceptedTlsClientFn, +} + +#[derive(Default)] +struct BoundClientListeners { + tcp: Option<SocketAddr>, + tcp_tls: Option<SocketAddr>, + ws: Option<SocketAddr>, + quic: Option<SocketAddr>, +} + +pub trait RunServerNg { + fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> impl Future<Output = Result<(), ServerNgError>>; +} + +impl RunServerNg for Rc<ServerNgShard> { + /// Run the fully bootstrapped `server-ng` shard. + /// + /// # Errors + /// + /// Returns an error if TCP listener bootstrap fails or cluster TCP + /// addresses cannot be resolved from config. + async fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> Result<(), ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let (stop_tx, stop_rx) = channel(1); + let message_pump_shard = Self::clone(self); + let message_pump_handle = compio::runtime::spawn(async move { + message_pump_shard.run_message_pump(stop_rx).await; + }); + self.bus.track_background(message_pump_handle); + + let on_replica_message = make_replica_message_handler(self); + let on_client_request = make_client_request_handler(self); + let accepted_replica = make_local_replica_accept_fn(&self.bus, on_replica_message); + let accepted_client = make_local_client_accept_fns(&self.bus, on_client_request); + + info!( + shard = self.id, + partitions = self.plane.partitions().len(), + "server-ng shard initialized" + ); + + if let Err(error) = + start_tcp_runtime(self, config, &topology, accepted_replica, accepted_client).await + { + let _ = stop_tx.try_send(()); + return Err(error); + } + + self.bus.token().wait().await; + let _ = stop_tx.try_send(()); + Ok(()) + } +} + +/// Load config, prepare directories, and complete late logging init. +/// +/// # Errors +/// +/// Returns an error if config loading, directory preparation, or logging +/// setup fails. +pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, ServerNgError> { + let config = ServerNgConfig::load() + .await + .map_err(ServerNgError::Config)?; + // TODO: decouple directory bootstrap from the `server` crate. + create_directories(&config.system) + .await + .map_err(ServerNgError::CreateDirectories)?; + logging + .late_init( + config.system.get_system_path(), + &config.system.logging, + &config.telemetry, + ) + .map_err(ServerNgError::Logging)?; + + Ok(config) +} + +/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// +/// # Errors +/// +/// Returns an error if metadata recovery, consensus restoration, or +/// partition hydration fails. +pub async fn bootstrap( + config: &ServerNgConfig, + current_replica_id: Option<u8>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config)); + let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) + .await + .map_err(ServerNgError::MetadataRecovery)?; + let restored_op = recovered.last_applied_op.unwrap_or_else(|| { + recovered + .snapshot + .as_ref() + .map_or(0, IggySnapshot::sequence_number) + }); + + let metadata = ServerNgMetadata::new( + Some(restore_metadata_consensus( + &recovered.journal, + restored_op, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + )), + Some(recovered.journal), + recovered.snapshot, + recovered.mux_stm, + Some(PathBuf::from(&config.system.path)), + ); + let shard = build_single_shard(config, &topology, metadata, bus).await?; + info!(shard = shard.id, "server-ng bootstrap complete"); + + Ok(shard) +} + +fn restore_metadata_consensus( + journal: &PrepareJournal, + restored_op: u64, + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> VsrConsensus<Rc<IggyMessageBus>> { + let mut consensus = VsrConsensus::new( + CLUSTER_ID, + self_replica_id, + replica_count, + 0, + bus, + LocalPipeline::new(), + ); + + let last_header = journal + .last_op() + .and_then(|op| usize::try_from(op).ok()) + .and_then(|op| journal.header(op).map(|header| *header)); + if let Some(header) = last_header { + consensus.set_view(header.view); + } + + consensus.init(); + consensus.sequencer().set_sequence(restored_op); + consensus.restore_commit_state(restored_op, restored_op); + if let Some(header) = last_header { + consensus.set_last_prepare_checksum(header.checksum); + consensus.set_log_view(header.view); + } + + consensus +} + +async fn build_single_shard( + config: &ServerNgConfig, + topology: &TcpTopology, + metadata: ServerNgMetadata, + bus: Rc<IggyMessageBus>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let shard_id = ShardId::new(SHARD_ID); + let partition_count = metadata.mux_stm.streams().read(|inner| { + inner + .items + .iter() + .map(|(_, stream)| { + stream + .topics + .iter() + .map(|(_, topic)| topic.partitions.len()) + .sum::<usize>() + }) + .sum() + }); + let mut partitions = IggyPartitions::with_capacity( + shard_id, + PartitionsConfig { + messages_required_to_save: config.system.partition.messages_required_to_save, + size_of_messages_required_to_save: config + .system + .partition + .size_of_messages_required_to_save, + enforce_fsync: config.system.partition.enforce_fsync, + segment_size: config.system.segment.size, + }, + partition_count, + ); + let shards_table = PapayaShardsTable::with_capacity(partition_count); + + let (topic_stats, namespaces) = metadata.mux_stm.streams().read(|inner| { + let mut topic_stats = Vec::new(); + let mut namespaces = Vec::with_capacity(partition_count); + for (_, stream) in &inner.items { + for (topic_id, topic) in &stream.topics { + topic_stats.push(topic.stats.clone()); + for partition in &topic.partitions { + namespaces.push((stream.id, topic_id, topic.stats.clone(), partition.clone())); + } + } + } + (topic_stats, namespaces) + }); + + for topic_stats in topic_stats { + topic_stats.zero_out_all(); + } + + for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces { + validate_recovered_namespace(config, stream_id, topic_id, partition_metadata.id)?; + let namespace = IggyNamespace::new(stream_id, topic_id, partition_metadata.id); + let partition = load_partition( + config, + namespace, + topic_stats, + &partition_metadata, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + ) + .await?; + let local_idx = partitions.insert(namespace, partition); + shards_table.insert( + namespace, + PartitionLocation::new(shard_id, LocalIdx::new(*local_idx)), + ); + } + + let (sender, inbox) = shard_channel::<()>(SHARD_ID, SHARD_INBOX_CAPACITY); + let senders = vec![sender]; + let shard_handle = Rc::new(RefCell::new(None)); + let on_replica_message = make_deferred_replica_message_handler(&shard_handle); + let on_client_request = make_deferred_client_request_handler(&shard_handle); + let built = IggyShardBuilder::new( + ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()), + Rc::clone(&bus), + on_replica_message, + on_client_request, + metadata, + partitions, + senders, + inbox, + shards_table, + PartitionConsensusConfig::new(CLUSTER_ID, topology.replica_count, Rc::clone(&bus)), + CoordinatorConfig::default(), + bus.token(), + ) + .build(); + if let Some(refresh_task) = built.refresh_task { + bus.track_background(refresh_task); + } + + let shard = Rc::new(built.shard); + *shard_handle.borrow_mut() = Some(Rc::downgrade(&shard)); + Ok(shard) +} + +const fn validate_recovered_namespace( + config: &ServerNgConfig, + stream_id: usize, + topic_id: usize, + partition_id: usize, +) -> Result<(), ServerNgError> { + let namespace = &config.extra.namespace; + if stream_id < namespace.max_streams + && topic_id < namespace.max_topics + && partition_id < namespace.max_partitions + { + return Ok(()); + } + + Err(ServerNgError::RecoveredNamespaceOutOfBounds { + stream_id, + topic_id, + partition_id, + max_streams: namespace.max_streams, + max_topics: namespace.max_topics, + max_partitions: namespace.max_partitions, + }) +} + +async fn load_partition( + config: &ServerNgConfig, + namespace: IggyNamespace, + topic_stats: Arc<TopicStats>, + partition_metadata: &Partition, + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> Result<IggyPartition<Rc<IggyMessageBus>>, ServerNgError> { + let stream_id = namespace.stream_id(); + let topic_id = namespace.topic_id(); + let partition_id = namespace.partition_id(); + let stats = Arc::new(PartitionStats::new(topic_stats)); + let consensus = VsrConsensus::new( + CLUSTER_ID, + self_replica_id, + replica_count, + namespace.inner(), + bus, + LocalPipeline::new(), + ); + consensus.init(); + + // TODO: decouple the loading logic from the `server` crate and load directly + // into the new `partitions` log/runtime types. + let loaded_log = server::bootstrap::load_segments( + &config.system, + stream_id, + topic_id, + partition_id, + config + .system + .get_partition_path(stream_id, topic_id, partition_id), + stats.clone(), + ) + .await + .map_err(|source| ServerNgError::PartitionLogLoad { + stream_id, + topic_id, + partition_id, + source, + })?; + + let mut partition = IggyPartition::new(stats.clone(), consensus); + hydrate_partition_log( + &mut partition, + config, + stream_id, + topic_id, + partition_id, + loaded_log, + ) + .await?; + + let current_offset = partition + .log + .segments() + .iter() + .filter(|segment| segment.size > IggyByteSize::default()) + .map(|segment| segment.end_offset) + .max() + .unwrap_or(0); + partition.created_at = partition_metadata.created_at; + partition.offset.store(current_offset, Ordering::Release); + partition + .dirty_offset + .store(current_offset, Ordering::Relaxed); + partition.should_increment_offset = partition + .log + .segments() + .iter() + .any(|segment| segment.size > IggyByteSize::default()); + partition.stats.set_current_offset(current_offset); + + configure_consumer_offsets(&mut partition, config, namespace, current_offset)?; + ensure_initial_segment(&mut partition, config, stream_id, topic_id, partition_id).await?; + + Ok(partition) +} + +async fn hydrate_partition_log( + partition: &mut IggyPartition<Rc<IggyMessageBus>>, + config: &ServerNgConfig, + stream_id: usize, + topic_id: usize, + partition_id: usize, + loaded_log: server::streaming::partitions::log::SegmentedLog< + server::streaming::partitions::journal::MemoryMessageJournal, + >, +) -> Result<(), ServerNgError> { + // TODO: decouple the loading logic from the `server` crate. This currently + // adapts the old server segmented log into the new `partitions` log. + for (segment_index, (segment, storage)) in loaded_log + .segments() + .iter() + .zip(loaded_log.storages().iter().cloned()) + .enumerate() + { + validate_recovered_segment( + stream_id, + topic_id, + partition_id, + segment, + &storage, + loaded_log + .indexes() + .get(segment_index) + .and_then(|indexes| indexes.as_ref()), + )?; + let max_timestamp = match loaded_log + .indexes() + .get(segment_index) + .and_then(|indexes| indexes.as_ref()) + { + Some(indexes) => indexes_max_timestamp(indexes), + None => load_segment_max_timestamp(&storage, stream_id, topic_id, partition_id).await?, + }; + partition.log.add_persisted_segment( + convert_segment(segment, max_timestamp), + storage, + None, + None, + ); + } + + if let Some(active_index) = partition.log.segments().len().checked_sub(1) { + let storage = &partition.log.storages()[active_index]; + if let (Some(messages_reader), Some(index_reader)) = ( + storage.messages_reader.as_ref(), + storage.index_reader.as_ref(), + ) { + let index_path = index_reader.path(); + let index_size = std::fs::metadata(&index_path).map_or(0, |metadata| metadata.len()); + partition.log.messages_writers_mut()[active_index] = Some(Rc::new( + MessagesWriter::new( + &messages_reader.path(), + Rc::new(AtomicU64::new(u64::from(messages_reader.file_size()))), + config.system.partition.enforce_fsync, + true, + ) + .await + .map_err(|source| ServerNgError::MessagesWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )); + partition.log.index_writers_mut()[active_index] = Some(Rc::new( + IggyIndexWriter::new( + &index_path, + Rc::new(AtomicU64::new(index_size)), + config.system.partition.enforce_fsync, + true, + ) + .await + .map_err(|source| ServerNgError::IndexWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )); + } + } + + Ok(()) +} + +fn validate_recovered_segment( + stream_id: usize, + topic_id: usize, + partition_id: usize, + segment: &iggy_common::Segment, + storage: &iggy_common::SegmentStorage, + indexes: Option<&server::streaming::segments::IggyIndexesMut>, +) -> Result<(), ServerNgError> { + let messages_size_bytes = storage + .messages_reader + .as_ref() + .map_or(0, |reader| u64::from(reader.file_size())); + let indexed_size_bytes = indexes.map_or(0, |indexes| u64::from(indexes.messages_size())); + if messages_size_bytes == indexed_size_bytes { Review Comment: `messages_size_bytes == indexed_size_bytes` is the wrong invariant. `iggy_partition.rs:1228-1244` writes one sparse-index entry per `commit_messages` call (gated by `flush_index.is_none()`), recording start-of-batch position. messages file accumulates all bytes per batch. after any clean shutdown of multi-batch server-ng-written data, `messages_size_bytes > indexed_size_bytes` strictly, so this validator returns `RecoveredSegmentSizeDivergence` on every restart. note: legacy `core/server/src/shard/system/messages.rs:597,665` writes one index per message, so old-format data still passes, but new server-ng data does not. recovery cannot validate its own output. fix options: drop the equality check, compute expected size from index + last batch length, or record file size in segment metadata. ########## core/server-ng/src/bootstrap.rs: ########## @@ -0,0 +1,1474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config_writer::write_current_config; +use crate::server_error::ServerNgError; +use configs::server_ng::ServerNgConfig; +use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; +use iggy_binary_protocol::RequestHeader; +use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use iggy_common::{ + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, TopicStats, + sharding::LocalIdx, variadic, +}; +use journal::Journal; +use journal::prepare_journal::PrepareJournal; +use message_bus::client_listener::{self, RequestHandler}; +use message_bus::fd_transfer; +use message_bus::installer; +use message_bus::installer::ConnectionInstaller; +use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind}; +use message_bus::replica::io as replica_io; +use message_bus::replica::listener::{self as replica_listener, MessageHandler}; +use message_bus::transports::quic::server_config_with_cert; +use message_bus::transports::tls::{ + TlsServerCredentials, install_default_crypto_provider, load_pem, self_signed_for_loopback, +}; +use message_bus::{ + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedTlsClientFn, + AcceptedWsClientFn, IggyMessageBus, connector, +}; +use metadata::IggyMetadata; +use metadata::MuxStateMachine; +use metadata::impls::metadata::{IggySnapshot, StreamsFrontend}; +use metadata::impls::recovery::recover; +use metadata::stm::consumer_group::ConsumerGroups; +use metadata::stm::snapshot::Snapshot; +use metadata::stm::stream::{Partition, Streams}; +use metadata::stm::user::Users; +use partitions::{ + IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, +}; +// TODO: decouple bootstrap/storage helpers and logging from the `server` crate. +use server::bootstrap::create_directories; +use server::log::logger::Logging; +use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; +use server::streaming::segments::storage::create_segment_storage; +use shard::builder::IggyShardBuilder; +use shard::shards_table::PapayaShardsTable; +use shard::{ + CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, channel, shard_channel, +}; +use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::net::{IpAddr, SocketAddr}; +use std::path::{Path, PathBuf}; +use std::rc::{Rc, Weak}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tracing::{info, warn}; + +const CLUSTER_ID: u128 = 1; +const SHARD_ID: u16 = 0; +const SHARD_REPLICA_ID: u8 = 0; +const SHARD_NAME: &str = "server-ng-shard-0"; +const SHARD_INBOX_CAPACITY: usize = 1024; + +type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; +type ServerNgMetadata = IggyMetadata< + VsrConsensus<Rc<IggyMessageBus>>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, +>; +type ServerNgShard = IggyShard< + Rc<IggyMessageBus>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, + PapayaShardsTable, +>; + +type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>; + +struct TcpTopology { + self_replica_id: u8, + replica_count: u8, + client_listen_addr: SocketAddr, + replica_listen_addr: Option<SocketAddr>, + ws_listen_addr: Option<SocketAddr>, + quic_listen_addr: Option<SocketAddr>, + tcp_tls_listen_addr: Option<SocketAddr>, + peers: Vec<(u8, SocketAddr)>, +} + +struct LocalClientAcceptFns { + tcp: AcceptedClientFn, + ws: AcceptedWsClientFn, + quic: AcceptedQuicClientFn, + tcp_tls: AcceptedTlsClientFn, +} + +#[derive(Default)] +struct BoundClientListeners { + tcp: Option<SocketAddr>, + tcp_tls: Option<SocketAddr>, + ws: Option<SocketAddr>, + quic: Option<SocketAddr>, +} + +pub trait RunServerNg { + fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> impl Future<Output = Result<(), ServerNgError>>; +} + +impl RunServerNg for Rc<ServerNgShard> { + /// Run the fully bootstrapped `server-ng` shard. + /// + /// # Errors + /// + /// Returns an error if TCP listener bootstrap fails or cluster TCP + /// addresses cannot be resolved from config. + async fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> Result<(), ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let (stop_tx, stop_rx) = channel(1); + let message_pump_shard = Self::clone(self); + let message_pump_handle = compio::runtime::spawn(async move { + message_pump_shard.run_message_pump(stop_rx).await; + }); + self.bus.track_background(message_pump_handle); + + let on_replica_message = make_replica_message_handler(self); + let on_client_request = make_client_request_handler(self); + let accepted_replica = make_local_replica_accept_fn(&self.bus, on_replica_message); + let accepted_client = make_local_client_accept_fns(&self.bus, on_client_request); + + info!( + shard = self.id, + partitions = self.plane.partitions().len(), + "server-ng shard initialized" + ); + + if let Err(error) = + start_tcp_runtime(self, config, &topology, accepted_replica, accepted_client).await + { + let _ = stop_tx.try_send(()); + return Err(error); + } + + self.bus.token().wait().await; + let _ = stop_tx.try_send(()); + Ok(()) + } +} + +/// Load config, prepare directories, and complete late logging init. +/// +/// # Errors +/// +/// Returns an error if config loading, directory preparation, or logging +/// setup fails. +pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, ServerNgError> { + let config = ServerNgConfig::load() + .await + .map_err(ServerNgError::Config)?; + // TODO: decouple directory bootstrap from the `server` crate. + create_directories(&config.system) + .await + .map_err(ServerNgError::CreateDirectories)?; + logging + .late_init( + config.system.get_system_path(), + &config.system.logging, + &config.telemetry, + ) + .map_err(ServerNgError::Logging)?; + + Ok(config) +} + +/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// +/// # Errors +/// +/// Returns an error if metadata recovery, consensus restoration, or +/// partition hydration fails. +pub async fn bootstrap( + config: &ServerNgConfig, + current_replica_id: Option<u8>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config)); + let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) + .await + .map_err(ServerNgError::MetadataRecovery)?; + let restored_op = recovered.last_applied_op.unwrap_or_else(|| { + recovered + .snapshot + .as_ref() + .map_or(0, IggySnapshot::sequence_number) + }); + + let metadata = ServerNgMetadata::new( + Some(restore_metadata_consensus( + &recovered.journal, + restored_op, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + )), + Some(recovered.journal), + recovered.snapshot, + recovered.mux_stm, + Some(PathBuf::from(&config.system.path)), + ); + let shard = build_single_shard(config, &topology, metadata, bus).await?; + info!(shard = shard.id, "server-ng bootstrap complete"); + + Ok(shard) +} + +fn restore_metadata_consensus( + journal: &PrepareJournal, + restored_op: u64, + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> VsrConsensus<Rc<IggyMessageBus>> { + let mut consensus = VsrConsensus::new( + CLUSTER_ID, + self_replica_id, + replica_count, + 0, + bus, + LocalPipeline::new(), + ); + + let last_header = journal + .last_op() + .and_then(|op| usize::try_from(op).ok()) + .and_then(|op| journal.header(op).map(|header| *header)); + if let Some(header) = last_header { + consensus.set_view(header.view); + } + + consensus.init(); + consensus.sequencer().set_sequence(restored_op); + consensus.restore_commit_state(restored_op, restored_op); + if let Some(header) = last_header { + consensus.set_last_prepare_checksum(header.checksum); + consensus.set_log_view(header.view); + } + + consensus +} + +async fn build_single_shard( + config: &ServerNgConfig, + topology: &TcpTopology, + metadata: ServerNgMetadata, + bus: Rc<IggyMessageBus>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let shard_id = ShardId::new(SHARD_ID); + let partition_count = metadata.mux_stm.streams().read(|inner| { + inner + .items + .iter() + .map(|(_, stream)| { + stream + .topics + .iter() + .map(|(_, topic)| topic.partitions.len()) + .sum::<usize>() + }) + .sum() + }); + let mut partitions = IggyPartitions::with_capacity( + shard_id, + PartitionsConfig { + messages_required_to_save: config.system.partition.messages_required_to_save, + size_of_messages_required_to_save: config + .system + .partition + .size_of_messages_required_to_save, + enforce_fsync: config.system.partition.enforce_fsync, + segment_size: config.system.segment.size, + }, + partition_count, + ); + let shards_table = PapayaShardsTable::with_capacity(partition_count); + + let (topic_stats, namespaces) = metadata.mux_stm.streams().read(|inner| { + let mut topic_stats = Vec::new(); + let mut namespaces = Vec::with_capacity(partition_count); + for (_, stream) in &inner.items { + for (topic_id, topic) in &stream.topics { + topic_stats.push(topic.stats.clone()); + for partition in &topic.partitions { + namespaces.push((stream.id, topic_id, topic.stats.clone(), partition.clone())); + } + } + } + (topic_stats, namespaces) + }); + + for topic_stats in topic_stats { + topic_stats.zero_out_all(); + } + + for (stream_id, topic_id, topic_stats, partition_metadata) in namespaces { + validate_recovered_namespace(config, stream_id, topic_id, partition_metadata.id)?; + let namespace = IggyNamespace::new(stream_id, topic_id, partition_metadata.id); + let partition = load_partition( + config, + namespace, + topic_stats, + &partition_metadata, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + ) + .await?; + let local_idx = partitions.insert(namespace, partition); + shards_table.insert( + namespace, + PartitionLocation::new(shard_id, LocalIdx::new(*local_idx)), + ); + } + + let (sender, inbox) = shard_channel::<()>(SHARD_ID, SHARD_INBOX_CAPACITY); + let senders = vec![sender]; + let shard_handle = Rc::new(RefCell::new(None)); + let on_replica_message = make_deferred_replica_message_handler(&shard_handle); + let on_client_request = make_deferred_client_request_handler(&shard_handle); + let built = IggyShardBuilder::new( + ShardIdentity::new(SHARD_ID, SHARD_NAME.to_string()), + Rc::clone(&bus), + on_replica_message, + on_client_request, + metadata, + partitions, + senders, + inbox, + shards_table, + PartitionConsensusConfig::new(CLUSTER_ID, topology.replica_count, Rc::clone(&bus)), + CoordinatorConfig::default(), + bus.token(), + ) + .build(); + if let Some(refresh_task) = built.refresh_task { + bus.track_background(refresh_task); + } + + let shard = Rc::new(built.shard); + *shard_handle.borrow_mut() = Some(Rc::downgrade(&shard)); + Ok(shard) +} + +const fn validate_recovered_namespace( + config: &ServerNgConfig, + stream_id: usize, + topic_id: usize, + partition_id: usize, +) -> Result<(), ServerNgError> { + let namespace = &config.extra.namespace; + if stream_id < namespace.max_streams + && topic_id < namespace.max_topics + && partition_id < namespace.max_partitions + { + return Ok(()); + } + + Err(ServerNgError::RecoveredNamespaceOutOfBounds { + stream_id, + topic_id, + partition_id, + max_streams: namespace.max_streams, + max_topics: namespace.max_topics, + max_partitions: namespace.max_partitions, + }) +} + +async fn load_partition( + config: &ServerNgConfig, + namespace: IggyNamespace, + topic_stats: Arc<TopicStats>, + partition_metadata: &Partition, + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> Result<IggyPartition<Rc<IggyMessageBus>>, ServerNgError> { + let stream_id = namespace.stream_id(); + let topic_id = namespace.topic_id(); + let partition_id = namespace.partition_id(); + let stats = Arc::new(PartitionStats::new(topic_stats)); + let consensus = VsrConsensus::new( + CLUSTER_ID, + self_replica_id, + replica_count, + namespace.inner(), + bus, + LocalPipeline::new(), + ); + consensus.init(); + + // TODO: decouple the loading logic from the `server` crate and load directly + // into the new `partitions` log/runtime types. + let loaded_log = server::bootstrap::load_segments( + &config.system, + stream_id, + topic_id, + partition_id, + config + .system + .get_partition_path(stream_id, topic_id, partition_id), + stats.clone(), + ) + .await + .map_err(|source| ServerNgError::PartitionLogLoad { + stream_id, + topic_id, + partition_id, + source, + })?; + + let mut partition = IggyPartition::new(stats.clone(), consensus); + hydrate_partition_log( + &mut partition, + config, + stream_id, + topic_id, + partition_id, + loaded_log, + ) + .await?; + + let current_offset = partition + .log + .segments() + .iter() + .filter(|segment| segment.size > IggyByteSize::default()) + .map(|segment| segment.end_offset) + .max() + .unwrap_or(0); + partition.created_at = partition_metadata.created_at; + partition.offset.store(current_offset, Ordering::Release); + partition + .dirty_offset + .store(current_offset, Ordering::Relaxed); + partition.should_increment_offset = partition + .log + .segments() + .iter() + .any(|segment| segment.size > IggyByteSize::default()); + partition.stats.set_current_offset(current_offset); + + configure_consumer_offsets(&mut partition, config, namespace, current_offset)?; + ensure_initial_segment(&mut partition, config, stream_id, topic_id, partition_id).await?; + + Ok(partition) +} + +async fn hydrate_partition_log( + partition: &mut IggyPartition<Rc<IggyMessageBus>>, + config: &ServerNgConfig, + stream_id: usize, + topic_id: usize, + partition_id: usize, + loaded_log: server::streaming::partitions::log::SegmentedLog< + server::streaming::partitions::journal::MemoryMessageJournal, + >, +) -> Result<(), ServerNgError> { + // TODO: decouple the loading logic from the `server` crate. This currently + // adapts the old server segmented log into the new `partitions` log. + for (segment_index, (segment, storage)) in loaded_log + .segments() + .iter() + .zip(loaded_log.storages().iter().cloned()) + .enumerate() + { + validate_recovered_segment( + stream_id, + topic_id, + partition_id, + segment, + &storage, + loaded_log + .indexes() + .get(segment_index) + .and_then(|indexes| indexes.as_ref()), + )?; + let max_timestamp = match loaded_log + .indexes() + .get(segment_index) + .and_then(|indexes| indexes.as_ref()) + { + Some(indexes) => indexes_max_timestamp(indexes), + None => load_segment_max_timestamp(&storage, stream_id, topic_id, partition_id).await?, + }; + partition.log.add_persisted_segment( + convert_segment(segment, max_timestamp), + storage, + None, + None, + ); + } + + if let Some(active_index) = partition.log.segments().len().checked_sub(1) { + let storage = &partition.log.storages()[active_index]; + if let (Some(messages_reader), Some(index_reader)) = ( + storage.messages_reader.as_ref(), + storage.index_reader.as_ref(), + ) { + let index_path = index_reader.path(); + let index_size = std::fs::metadata(&index_path).map_or(0, |metadata| metadata.len()); + partition.log.messages_writers_mut()[active_index] = Some(Rc::new( + MessagesWriter::new( + &messages_reader.path(), + Rc::new(AtomicU64::new(u64::from(messages_reader.file_size()))), + config.system.partition.enforce_fsync, + true, + ) + .await + .map_err(|source| ServerNgError::MessagesWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )); + partition.log.index_writers_mut()[active_index] = Some(Rc::new( + IggyIndexWriter::new( + &index_path, + Rc::new(AtomicU64::new(index_size)), + config.system.partition.enforce_fsync, + true, + ) + .await + .map_err(|source| ServerNgError::IndexWriterInit { + stream_id, + topic_id, + partition_id, + source, + })?, + )); + } + } + + Ok(()) +} + +fn validate_recovered_segment( + stream_id: usize, + topic_id: usize, + partition_id: usize, + segment: &iggy_common::Segment, + storage: &iggy_common::SegmentStorage, + indexes: Option<&server::streaming::segments::IggyIndexesMut>, +) -> Result<(), ServerNgError> { + let messages_size_bytes = storage + .messages_reader + .as_ref() + .map_or(0, |reader| u64::from(reader.file_size())); + let indexed_size_bytes = indexes.map_or(0, |indexes| u64::from(indexes.messages_size())); + if messages_size_bytes == indexed_size_bytes { + return Ok(()); + } + + Err(ServerNgError::RecoveredSegmentSizeDivergence { + stream_id, + topic_id, + partition_id, + start_offset: segment.start_offset, + end_offset: segment.end_offset, + messages_size_bytes, + indexed_size_bytes, + }) +} + +fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> Segment { + Segment { + sealed: segment.sealed, + start_timestamp: segment.start_timestamp, + end_timestamp: segment.end_timestamp, + max_timestamp, + current_position: u64::from(segment.current_position), + start_offset: segment.start_offset, + end_offset: segment.end_offset, + size: segment.size, + max_size: segment.max_size, + } +} + +fn indexes_max_timestamp(indexes: &server::streaming::segments::IggyIndexesMut) -> u64 { + let mut max_timestamp = 0; + for index in 0..indexes.count() { + if let Some(index_view) = indexes.get(index) { + max_timestamp = max_timestamp.max(index_view.timestamp()); + } + } + + max_timestamp +} + +async fn load_segment_max_timestamp( + storage: &iggy_common::SegmentStorage, + stream_id: usize, + topic_id: usize, + partition_id: usize, +) -> Result<u64, ServerNgError> { + let Some(index_reader) = storage.index_reader.as_ref() else { + return Ok(0); + }; + + let indexes = index_reader + .load_all_indexes_from_disk() + .await + .map_err(|source| ServerNgError::SegmentIndexesLoad { + stream_id, + topic_id, + partition_id, + source, + })?; + Ok(indexes_max_timestamp(&indexes)) +} + +fn configure_consumer_offsets( + partition: &mut IggyPartition<Rc<IggyMessageBus>>, + config: &ServerNgConfig, + namespace: IggyNamespace, + current_offset: u64, +) -> Result<(), ServerNgError> { + let stream_id = namespace.stream_id(); + let topic_id = namespace.topic_id(); + let partition_id = namespace.partition_id(); + let consumer_offsets_path = + config + .system + .get_consumer_offsets_path(stream_id, topic_id, partition_id); + let consumer_group_offsets_path = + config + .system + .get_consumer_group_offsets_path(stream_id, topic_id, partition_id); + + // TODO: decouple consumer offset loading from the `server` crate. + let loaded_consumer_offsets = load_consumer_offsets(&consumer_offsets_path).unwrap_or_default(); Review Comment: also at line 691. `load_consumer_offsets` (and the group variant) at `core/server/src/streaming/partitions/storage.rs:165-168,200,208,227-230` returns `IggyError::CannotReadConsumerOffsets` / `CannotReadFile` for any read failure: ENOENT (legitimate first-boot empty), EACCES, EBUSY, EIO. all collapse into `Vec::default()` here. a transient FS failure during boot silently resets consumer offsets, so consumers re-read messages already consumed. fix: propagate via a new `ServerNgError::ConsumerOffsetsLoad` variant. distinguish "directory missing" (legitimate empty on first boot) from "load failed" if needed. ########## core/server-ng/src/bootstrap.rs: ########## @@ -0,0 +1,1474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config_writer::write_current_config; +use crate::server_error::ServerNgError; +use configs::server_ng::ServerNgConfig; +use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; +use iggy_binary_protocol::RequestHeader; +use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use iggy_common::{ + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, TopicStats, + sharding::LocalIdx, variadic, +}; +use journal::Journal; +use journal::prepare_journal::PrepareJournal; +use message_bus::client_listener::{self, RequestHandler}; +use message_bus::fd_transfer; +use message_bus::installer; +use message_bus::installer::ConnectionInstaller; +use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind}; +use message_bus::replica::io as replica_io; +use message_bus::replica::listener::{self as replica_listener, MessageHandler}; +use message_bus::transports::quic::server_config_with_cert; +use message_bus::transports::tls::{ + TlsServerCredentials, install_default_crypto_provider, load_pem, self_signed_for_loopback, +}; +use message_bus::{ + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedTlsClientFn, + AcceptedWsClientFn, IggyMessageBus, connector, +}; +use metadata::IggyMetadata; +use metadata::MuxStateMachine; +use metadata::impls::metadata::{IggySnapshot, StreamsFrontend}; +use metadata::impls::recovery::recover; +use metadata::stm::consumer_group::ConsumerGroups; +use metadata::stm::snapshot::Snapshot; +use metadata::stm::stream::{Partition, Streams}; +use metadata::stm::user::Users; +use partitions::{ + IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, +}; +// TODO: decouple bootstrap/storage helpers and logging from the `server` crate. +use server::bootstrap::create_directories; +use server::log::logger::Logging; +use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; +use server::streaming::segments::storage::create_segment_storage; +use shard::builder::IggyShardBuilder; +use shard::shards_table::PapayaShardsTable; +use shard::{ + CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, channel, shard_channel, +}; +use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::net::{IpAddr, SocketAddr}; +use std::path::{Path, PathBuf}; +use std::rc::{Rc, Weak}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tracing::{info, warn}; + +const CLUSTER_ID: u128 = 1; +const SHARD_ID: u16 = 0; +const SHARD_REPLICA_ID: u8 = 0; +const SHARD_NAME: &str = "server-ng-shard-0"; +const SHARD_INBOX_CAPACITY: usize = 1024; + +type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; +type ServerNgMetadata = IggyMetadata< + VsrConsensus<Rc<IggyMessageBus>>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, +>; +type ServerNgShard = IggyShard< + Rc<IggyMessageBus>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, + PapayaShardsTable, +>; + +type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>; + +struct TcpTopology { + self_replica_id: u8, + replica_count: u8, + client_listen_addr: SocketAddr, + replica_listen_addr: Option<SocketAddr>, + ws_listen_addr: Option<SocketAddr>, + quic_listen_addr: Option<SocketAddr>, + tcp_tls_listen_addr: Option<SocketAddr>, + peers: Vec<(u8, SocketAddr)>, +} + +struct LocalClientAcceptFns { + tcp: AcceptedClientFn, + ws: AcceptedWsClientFn, + quic: AcceptedQuicClientFn, + tcp_tls: AcceptedTlsClientFn, +} + +#[derive(Default)] +struct BoundClientListeners { + tcp: Option<SocketAddr>, + tcp_tls: Option<SocketAddr>, + ws: Option<SocketAddr>, + quic: Option<SocketAddr>, +} + +pub trait RunServerNg { + fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> impl Future<Output = Result<(), ServerNgError>>; +} + +impl RunServerNg for Rc<ServerNgShard> { + /// Run the fully bootstrapped `server-ng` shard. + /// + /// # Errors + /// + /// Returns an error if TCP listener bootstrap fails or cluster TCP + /// addresses cannot be resolved from config. + async fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> Result<(), ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let (stop_tx, stop_rx) = channel(1); + let message_pump_shard = Self::clone(self); + let message_pump_handle = compio::runtime::spawn(async move { + message_pump_shard.run_message_pump(stop_rx).await; + }); + self.bus.track_background(message_pump_handle); + + let on_replica_message = make_replica_message_handler(self); + let on_client_request = make_client_request_handler(self); + let accepted_replica = make_local_replica_accept_fn(&self.bus, on_replica_message); + let accepted_client = make_local_client_accept_fns(&self.bus, on_client_request); + + info!( + shard = self.id, + partitions = self.plane.partitions().len(), + "server-ng shard initialized" + ); + + if let Err(error) = + start_tcp_runtime(self, config, &topology, accepted_replica, accepted_client).await + { + let _ = stop_tx.try_send(()); + return Err(error); + } + + self.bus.token().wait().await; + let _ = stop_tx.try_send(()); + Ok(()) + } +} + +/// Load config, prepare directories, and complete late logging init. +/// +/// # Errors +/// +/// Returns an error if config loading, directory preparation, or logging +/// setup fails. +pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, ServerNgError> { + let config = ServerNgConfig::load() + .await + .map_err(ServerNgError::Config)?; + // TODO: decouple directory bootstrap from the `server` crate. + create_directories(&config.system) + .await + .map_err(ServerNgError::CreateDirectories)?; + logging + .late_init( + config.system.get_system_path(), + &config.system.logging, + &config.telemetry, + ) + .map_err(ServerNgError::Logging)?; + + Ok(config) +} + +/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// +/// # Errors +/// +/// Returns an error if metadata recovery, consensus restoration, or +/// partition hydration fails. +pub async fn bootstrap( + config: &ServerNgConfig, + current_replica_id: Option<u8>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config)); + let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) + .await + .map_err(ServerNgError::MetadataRecovery)?; + let restored_op = recovered.last_applied_op.unwrap_or_else(|| { + recovered + .snapshot + .as_ref() + .map_or(0, IggySnapshot::sequence_number) + }); + + let metadata = ServerNgMetadata::new( + Some(restore_metadata_consensus( + &recovered.journal, + restored_op, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + )), + Some(recovered.journal), + recovered.snapshot, + recovered.mux_stm, + Some(PathBuf::from(&config.system.path)), + ); + let shard = build_single_shard(config, &topology, metadata, bus).await?; + info!(shard = shard.id, "server-ng bootstrap complete"); + + Ok(shard) +} + +fn restore_metadata_consensus( + journal: &PrepareJournal, + restored_op: u64, + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> VsrConsensus<Rc<IggyMessageBus>> { + let mut consensus = VsrConsensus::new( + CLUSTER_ID, + self_replica_id, + replica_count, + 0, + bus, + LocalPipeline::new(), + ); + + let last_header = journal + .last_op() + .and_then(|op| usize::try_from(op).ok()) + .and_then(|op| journal.header(op).map(|header| *header)); + if let Some(header) = last_header { Review Comment: `set_log_view(header.view)` uses the view in which the primary appended this entry locally. VSR invariant per the doc-comment at `impls.rs:506-513`: `log_view` is the view in which this replica's log head was quorum-confirmed (won DVC or processed SV from new primary). setting it to a write-time view over-inflates. `view_change_quorum.rs:76-84` `dvc_select_winner` picks max `(log_view, op)`. a backup with locally-appended-but-uncommitted ops at view V will beat replicas with the genuinely-quorum-confirmed lower log_view, then `complete_view_change_as_primary` at `impls.rs:1546-1601` forces peers to install the contaminated head via SendStartView. single-node never runs winner selection. multi-replica latent landmine. fix: persist `log_view_durable` in a superblock; advance only on quorum SV/DVC win. ########## core/common/src/sharding/namespace.rs: ########## @@ -49,6 +49,47 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1; pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1; pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NamespaceCapacityError { Review Comment: I think we should use derive `Error` from `thiserror` crate. ########## core/server-ng/src/bootstrap.rs: ########## @@ -0,0 +1,1474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config_writer::write_current_config; +use crate::server_error::ServerNgError; +use configs::server_ng::ServerNgConfig; +use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; +use iggy_binary_protocol::RequestHeader; +use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use iggy_common::{ + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, TopicStats, + sharding::LocalIdx, variadic, +}; +use journal::Journal; +use journal::prepare_journal::PrepareJournal; +use message_bus::client_listener::{self, RequestHandler}; +use message_bus::fd_transfer; +use message_bus::installer; +use message_bus::installer::ConnectionInstaller; +use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind}; +use message_bus::replica::io as replica_io; +use message_bus::replica::listener::{self as replica_listener, MessageHandler}; +use message_bus::transports::quic::server_config_with_cert; +use message_bus::transports::tls::{ + TlsServerCredentials, install_default_crypto_provider, load_pem, self_signed_for_loopback, +}; +use message_bus::{ + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedTlsClientFn, + AcceptedWsClientFn, IggyMessageBus, connector, +}; +use metadata::IggyMetadata; +use metadata::MuxStateMachine; +use metadata::impls::metadata::{IggySnapshot, StreamsFrontend}; +use metadata::impls::recovery::recover; +use metadata::stm::consumer_group::ConsumerGroups; +use metadata::stm::snapshot::Snapshot; +use metadata::stm::stream::{Partition, Streams}; +use metadata::stm::user::Users; +use partitions::{ + IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, +}; +// TODO: decouple bootstrap/storage helpers and logging from the `server` crate. +use server::bootstrap::create_directories; +use server::log::logger::Logging; +use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; +use server::streaming::segments::storage::create_segment_storage; +use shard::builder::IggyShardBuilder; +use shard::shards_table::PapayaShardsTable; +use shard::{ + CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, channel, shard_channel, +}; +use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::net::{IpAddr, SocketAddr}; +use std::path::{Path, PathBuf}; +use std::rc::{Rc, Weak}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tracing::{info, warn}; + +const CLUSTER_ID: u128 = 1; +const SHARD_ID: u16 = 0; +const SHARD_REPLICA_ID: u8 = 0; +const SHARD_NAME: &str = "server-ng-shard-0"; +const SHARD_INBOX_CAPACITY: usize = 1024; + +type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; +type ServerNgMetadata = IggyMetadata< + VsrConsensus<Rc<IggyMessageBus>>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, +>; +type ServerNgShard = IggyShard< + Rc<IggyMessageBus>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, + PapayaShardsTable, +>; + +type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>; + +struct TcpTopology { + self_replica_id: u8, + replica_count: u8, + client_listen_addr: SocketAddr, + replica_listen_addr: Option<SocketAddr>, + ws_listen_addr: Option<SocketAddr>, + quic_listen_addr: Option<SocketAddr>, + tcp_tls_listen_addr: Option<SocketAddr>, + peers: Vec<(u8, SocketAddr)>, +} + +struct LocalClientAcceptFns { + tcp: AcceptedClientFn, + ws: AcceptedWsClientFn, + quic: AcceptedQuicClientFn, + tcp_tls: AcceptedTlsClientFn, +} + +#[derive(Default)] +struct BoundClientListeners { + tcp: Option<SocketAddr>, + tcp_tls: Option<SocketAddr>, + ws: Option<SocketAddr>, + quic: Option<SocketAddr>, +} + +pub trait RunServerNg { + fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> impl Future<Output = Result<(), ServerNgError>>; +} + +impl RunServerNg for Rc<ServerNgShard> { + /// Run the fully bootstrapped `server-ng` shard. + /// + /// # Errors + /// + /// Returns an error if TCP listener bootstrap fails or cluster TCP + /// addresses cannot be resolved from config. + async fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> Result<(), ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let (stop_tx, stop_rx) = channel(1); + let message_pump_shard = Self::clone(self); + let message_pump_handle = compio::runtime::spawn(async move { + message_pump_shard.run_message_pump(stop_rx).await; + }); + self.bus.track_background(message_pump_handle); + + let on_replica_message = make_replica_message_handler(self); + let on_client_request = make_client_request_handler(self); + let accepted_replica = make_local_replica_accept_fn(&self.bus, on_replica_message); + let accepted_client = make_local_client_accept_fns(&self.bus, on_client_request); + + info!( + shard = self.id, + partitions = self.plane.partitions().len(), + "server-ng shard initialized" + ); + + if let Err(error) = + start_tcp_runtime(self, config, &topology, accepted_replica, accepted_client).await + { + let _ = stop_tx.try_send(()); + return Err(error); + } + + self.bus.token().wait().await; + let _ = stop_tx.try_send(()); + Ok(()) + } +} + +/// Load config, prepare directories, and complete late logging init. +/// +/// # Errors +/// +/// Returns an error if config loading, directory preparation, or logging +/// setup fails. +pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, ServerNgError> { + let config = ServerNgConfig::load() + .await + .map_err(ServerNgError::Config)?; + // TODO: decouple directory bootstrap from the `server` crate. + create_directories(&config.system) + .await + .map_err(ServerNgError::CreateDirectories)?; + logging + .late_init( + config.system.get_system_path(), + &config.system.logging, + &config.telemetry, + ) + .map_err(ServerNgError::Logging)?; + + Ok(config) +} + +/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// +/// # Errors +/// +/// Returns an error if metadata recovery, consensus restoration, or +/// partition hydration fails. +pub async fn bootstrap( + config: &ServerNgConfig, + current_replica_id: Option<u8>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config)); + let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) + .await + .map_err(ServerNgError::MetadataRecovery)?; + let restored_op = recovered.last_applied_op.unwrap_or_else(|| { Review Comment: `recovery.rs:139-158` walks the WAL and applies every replayed entry, setting `last_applied_op` to journal tail M. bootstrap then calls `restore_commit_state(M, M)` declaring `commit_min == commit_max == M`. `impls.rs:1110-1130, 1218, 1230` propagate `commit_min` via primary heartbeat and DVC; `view_change_quorum.rs:88-95` `dvc_max_commit` propagates further into peer state via `complete_view_change_as_primary`. WAL entries past the last quorum-confirmed op are NOT committed; bootstrap declares them so. multi-replica safety violation: a replica with uncommitted-but-replayed ops claims they're committed. single-node trivially correct (quorum=1). latent landmine the moment cluster ships. fix: persist a separate commit watermark (superblock) and only restore commit_min up to that watermark. replay-as-applied does not mean replay-as-committed. ########## core/server-ng/src/bootstrap.rs: ########## @@ -0,0 +1,1474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config_writer::write_current_config; +use crate::server_error::ServerNgError; +use configs::server_ng::ServerNgConfig; +use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; +use iggy_binary_protocol::RequestHeader; +use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use iggy_common::{ + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, PartitionStats, TopicStats, + sharding::LocalIdx, variadic, +}; +use journal::Journal; +use journal::prepare_journal::PrepareJournal; +use message_bus::client_listener::{self, RequestHandler}; +use message_bus::fd_transfer; +use message_bus::installer; +use message_bus::installer::ConnectionInstaller; +use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind}; +use message_bus::replica::io as replica_io; +use message_bus::replica::listener::{self as replica_listener, MessageHandler}; +use message_bus::transports::quic::server_config_with_cert; +use message_bus::transports::tls::{ + TlsServerCredentials, install_default_crypto_provider, load_pem, self_signed_for_loopback, +}; +use message_bus::{ + AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedTlsClientFn, + AcceptedWsClientFn, IggyMessageBus, connector, +}; +use metadata::IggyMetadata; +use metadata::MuxStateMachine; +use metadata::impls::metadata::{IggySnapshot, StreamsFrontend}; +use metadata::impls::recovery::recover; +use metadata::stm::consumer_group::ConsumerGroups; +use metadata::stm::snapshot::Snapshot; +use metadata::stm::stream::{Partition, Streams}; +use metadata::stm::user::Users; +use partitions::{ + IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, +}; +// TODO: decouple bootstrap/storage helpers and logging from the `server` crate. +use server::bootstrap::create_directories; +use server::log::logger::Logging; +use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; +use server::streaming::segments::storage::create_segment_storage; +use shard::builder::IggyShardBuilder; +use shard::shards_table::PapayaShardsTable; +use shard::{ + CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, channel, shard_channel, +}; +use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::net::{IpAddr, SocketAddr}; +use std::path::{Path, PathBuf}; +use std::rc::{Rc, Weak}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tracing::{info, warn}; + +const CLUSTER_ID: u128 = 1; +const SHARD_ID: u16 = 0; +const SHARD_REPLICA_ID: u8 = 0; +const SHARD_NAME: &str = "server-ng-shard-0"; +const SHARD_INBOX_CAPACITY: usize = 1024; + +type ServerNgMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>; +type ServerNgMetadata = IggyMetadata< + VsrConsensus<Rc<IggyMessageBus>>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, +>; +type ServerNgShard = IggyShard< + Rc<IggyMessageBus>, + PrepareJournal, + IggySnapshot, + ServerNgMuxStateMachine, + PapayaShardsTable, +>; + +type ServerNgShardHandle = Rc<RefCell<Option<Weak<ServerNgShard>>>>; + +struct TcpTopology { + self_replica_id: u8, + replica_count: u8, + client_listen_addr: SocketAddr, + replica_listen_addr: Option<SocketAddr>, + ws_listen_addr: Option<SocketAddr>, + quic_listen_addr: Option<SocketAddr>, + tcp_tls_listen_addr: Option<SocketAddr>, + peers: Vec<(u8, SocketAddr)>, +} + +struct LocalClientAcceptFns { + tcp: AcceptedClientFn, + ws: AcceptedWsClientFn, + quic: AcceptedQuicClientFn, + tcp_tls: AcceptedTlsClientFn, +} + +#[derive(Default)] +struct BoundClientListeners { + tcp: Option<SocketAddr>, + tcp_tls: Option<SocketAddr>, + ws: Option<SocketAddr>, + quic: Option<SocketAddr>, +} + +pub trait RunServerNg { + fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> impl Future<Output = Result<(), ServerNgError>>; +} + +impl RunServerNg for Rc<ServerNgShard> { + /// Run the fully bootstrapped `server-ng` shard. + /// + /// # Errors + /// + /// Returns an error if TCP listener bootstrap fails or cluster TCP + /// addresses cannot be resolved from config. + async fn run( + &self, + config: &ServerNgConfig, + current_replica_id: Option<u8>, + ) -> Result<(), ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let (stop_tx, stop_rx) = channel(1); + let message_pump_shard = Self::clone(self); + let message_pump_handle = compio::runtime::spawn(async move { + message_pump_shard.run_message_pump(stop_rx).await; + }); + self.bus.track_background(message_pump_handle); + + let on_replica_message = make_replica_message_handler(self); + let on_client_request = make_client_request_handler(self); + let accepted_replica = make_local_replica_accept_fn(&self.bus, on_replica_message); + let accepted_client = make_local_client_accept_fns(&self.bus, on_client_request); + + info!( + shard = self.id, + partitions = self.plane.partitions().len(), + "server-ng shard initialized" + ); + + if let Err(error) = + start_tcp_runtime(self, config, &topology, accepted_replica, accepted_client).await + { + let _ = stop_tx.try_send(()); + return Err(error); + } + + self.bus.token().wait().await; + let _ = stop_tx.try_send(()); + Ok(()) + } +} + +/// Load config, prepare directories, and complete late logging init. +/// +/// # Errors +/// +/// Returns an error if config loading, directory preparation, or logging +/// setup fails. +pub async fn load_config(logging: &mut Logging) -> Result<ServerNgConfig, ServerNgError> { + let config = ServerNgConfig::load() + .await + .map_err(ServerNgError::Config)?; + // TODO: decouple directory bootstrap from the `server` crate. + create_directories(&config.system) + .await + .map_err(ServerNgError::CreateDirectories)?; + logging + .late_init( + config.system.get_system_path(), + &config.system.logging, + &config.telemetry, + ) + .map_err(ServerNgError::Logging)?; + + Ok(config) +} + +/// Bootstraps `server-ng` from config and on-disk metadata/partition state. +/// +/// # Errors +/// +/// Returns an error if metadata recovery, consensus restoration, or +/// partition hydration fails. +pub async fn bootstrap( + config: &ServerNgConfig, + current_replica_id: Option<u8>, +) -> Result<Rc<ServerNgShard>, ServerNgError> { + let topology = resolve_tcp_topology(config, current_replica_id)?; + let bus = Rc::new(IggyMessageBus::with_config(SHARD_ID, config)); + let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) + .await + .map_err(ServerNgError::MetadataRecovery)?; + let restored_op = recovered.last_applied_op.unwrap_or_else(|| { + recovered + .snapshot + .as_ref() + .map_or(0, IggySnapshot::sequence_number) + }); + + let metadata = ServerNgMetadata::new( + Some(restore_metadata_consensus( + &recovered.journal, + restored_op, + topology.self_replica_id, + topology.replica_count, + Rc::clone(&bus), + )), + Some(recovered.journal), + recovered.snapshot, + recovered.mux_stm, + Some(PathBuf::from(&config.system.path)), + ); + let shard = build_single_shard(config, &topology, metadata, bus).await?; + info!(shard = shard.id, "server-ng bootstrap complete"); + + Ok(shard) +} + +fn restore_metadata_consensus( + journal: &PrepareJournal, + restored_op: u64, + self_replica_id: u8, + replica_count: u8, + bus: Rc<IggyMessageBus>, +) -> VsrConsensus<Rc<IggyMessageBus>> { + let mut consensus = VsrConsensus::new( + CLUSTER_ID, + self_replica_id, + replica_count, + 0, + bus, + LocalPipeline::new(), + ); + + let last_header = journal Review Comment: `last_header` is Some only if `journal.last_op()` AND `journal.header(op)` both Some. after `journal.drain(0..=N)` (used during snapshot+truncate at `prepare_journal.rs:399-405`), slots are cleared but the `last_op` cell is NOT updated by drain. so `journal.last_op() = Some(N)` while `journal.header(N) = None`, `last_header = None` here, `last_prepare_checksum=0`, `log_view=0`. sequencer correctly set to N. next prepare's `parent=0`. single-node: silent, no peer to detect. on-disk invariant "WAL chains continuously across snapshot boundary" broken, matters for offline tooling and any future joining peer. fix: persist `last_prepare_checksum` + `log_view` in the snapshot itself, plumb through `restore_metadata_consensus`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
