jayakasadev commented on code in PR #3269:
URL: https://github.com/apache/iggy/pull/3269#discussion_r3267870664


##########
core/journal/src/prepare_journal.rs:
##########
@@ -387,6 +437,18 @@ impl Journal<FileStorage> for PrepareJournal {
         // Reopen the file descriptor at the same path.
         self.storage.reopen().await?;
 
+        // Advance the snapshot watermark only AFTER the WAL rewrite is
+        // durable (tmp create -> write -> fsync -> rename -> fsync parent
+        // -> reopen). Advancing earlier would leave `snapshot_op` past
+        // entries still present on disk on any `?` failure above, letting
+        // a future `append()` pass the slot collision check at
+        // `existing.op <= snapshot_op` and silently evict a live entry
+        // from the index. The entry would survive on disk but become
+        // unreachable, stalling `RetransmitPrepares` until view change.
+        if end_op > self.snapshot_op.get() {
+            self.snapshot_op.set(end_op);

Review Comment:
   Good catch. LGTM on the fix.



##########
core/journal/src/prepare_journal.rs:
##########
@@ -143,27 +186,40 @@ impl PrepareJournal {
         while pos + HEADER_SIZE as u64 <= file_len {
             // Read the 256-byte header
             header_buf = storage.read_at(pos, header_buf).await?;
-            let header: PrepareHeader =
-                *bytemuck::checked::from_bytes::<PrepareHeader>(&header_buf);
+            // `try_from_bytes` so an invalid bit pattern (e.g. a corrupt
+            // `command` byte yielding no `Command2` variant) routes through
+            // `truncate_or_fail` instead of panicking the recovery thread.
+            let Ok(header_ref) = 
bytemuck::checked::try_from_bytes::<PrepareHeader>(&header_buf)
+            else {
+                truncate_or_fail(&storage, pos, "corrupt header (invalid bit 
pattern)").await?;
+                break;
+            };
+            let header: PrepareHeader = *header_ref;
 
             // Validate: must be a Prepare command with sane size
             if header.command != Command2::Prepare
                 || (header.size as usize) < HEADER_SIZE
                 || u64::from(header.size) > MAX_ENTRY_SIZE
             {
-                // Corrupt or non-prepare entry, truncate here
-                storage.truncate(pos).await?;
+                truncate_or_fail(&storage, pos, "corrupt or non-prepare 
entry").await?;
                 break;
             }
 
             let entry_size = u64::from(header.size);
 
+            // TODO(hubcio): verify `header.checksum` / `header.checksum_body`

Review Comment:
   Makes sense, TODO keeps it visible. CRC32/xxHash over the body at append + 
verify at scan when you're ready.



##########
core/message_bus/src/installer/replica.rs:
##########
@@ -266,6 +267,12 @@ pub fn install_replica_conn<C: TransportConn>(
     ) {
         Ok(token) => {
             install_token.set(Some(token));
+            // Stamp this shard as the owner so every shard's
+            // `send_to_replica` slow path can route to us immediately. The
+            // matching CAS-clear runs from
+            // `IggyMessageBus::notify_connection_lost` on either of the
+            // post-loop guards firing.
+            bus.mark_replica_owned(peer_id);

Review Comment:
   Confirmed — ordering makes the gap safe, and 
`frame_drops_total{reason="owner_none"}` keeps it observable. LGTM.



##########
core/server-ng/src/bootstrap.rs:
##########
@@ -42,52 +41,68 @@ use message_bus::transports::tls::{
 };
 use message_bus::{
     AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
-    AcceptedWsClientFn, IggyMessageBus, connector,
+    AcceptedWsClientFn, IggyMessageBus, ReplicaOwnerTable, connector,
 };
 use metadata::IggyMetadata;
 use metadata::MuxStateMachine;
 use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
 use metadata::impls::recovery::recover;
 use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::mux::WithFactory;
 use metadata::stm::snapshot::Snapshot;
 use metadata::stm::stream::{Partition, Streams};
 use metadata::stm::user::Users;
 use partitions::{
     IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
 };
 // TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
-use server::bootstrap::create_directories;
+use server::bootstrap::{create_directories, create_shard_executor};
 use server::log::logger::Logging;
+use server::shard_allocator::{ShardAllocator, ShardInfo};
 use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
 use server::streaming::segments::storage::create_segment_storage;
 use shard::builder::IggyShardBuilder;
-use shard::shards_table::PapayaShardsTable;
+use shard::metrics::ShardMetrics;
+use shard::shards_table::{PapayaShardsTable, calculate_shard_assignment};
 use shard::{
-    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, Receiver as 
ShardReceiver, ShardFrame,
+    ShardIdentity, TaggedSender, channel, shard_mesh_channels,
 };
-use std::cell::{Cell, RefCell};
-use std::future::Future;
+use std::cell::RefCell;
 use std::net::{IpAddr, SocketAddr};
 use std::path::{Path, PathBuf};
 use std::rc::{Rc, Weak};
 use std::sync::Arc;
-use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::thread;
+use std::time::Duration;
 use tracing::{error, info, warn};
 
 const CLUSTER_ID: u128 = 1;
-const SHARD_ID: u16 = 0;
 const SHARD_REPLICA_ID: u8 = 0;

Review Comment:
   LGTM, model is clear.



##########
core/configs/src/server_config/sharding.rs:
##########
@@ -21,11 +21,69 @@ use std::str::FromStr;
 
 use configs::ConfigEnv;
 
-#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)]
+/// Default capacity of the per-shard inter-shard inbox channel. Sized
+/// comfortably above the consensus working set, which is roughly
+/// `PIPELINE_PREPARE_QUEUE_MAX (= 8) * replica_count * directions`
+/// frames in flight per shard, without allowing a runaway producer to
+/// eat unbounded memory. Tunable via `[system.sharding] inbox_capacity`
+/// in TOML.
+///
+/// The capacity must also absorb the worst-case cross-shard client
+/// Reply burst. Unlike consensus frames, client Replies have no VSR
+/// retransmit path: a Reply lost on full inbox is gone and the client
+/// times out. A reasonable lower bound is
+/// `max_inflight_client_requests / num_shards` (assuming requests are
+/// distributed evenly across owning shards) plus the consensus
+/// headroom above.
+///
+/// Consensus frames and client-reply forwards share this one channel,
+/// so the two headrooms are not independent: a consensus burst or
+/// retransmit storm can fill the inbox with consensus frames exactly
+/// when a client Reply needs the space. A single `inbox_capacity` knob
+/// cannot isolate the two frame classes - size it for the sum of both
+/// worst cases occurring together. Watch the drop-site `tracing` logs
+/// (and, once a per-shard exporter lands, the `frame_drops_total`
+/// `{variant="forward_client_send"}` counter) to detect when the bound
+/// is too low in production.
+pub const DEFAULT_INBOX_CAPACITY: usize = 1024;
+
+/// Maximum permitted per-shard inbox depth. The channel is allocated
+/// up-front per shard, so a runaway value here OOMs the process at boot.
+/// `1 << 20` (~1M frames) is several orders of magnitude above any
+/// realistic backpressure target and still fits comfortably in process
+/// address space.
+pub const INBOX_CAPACITY_MAX: usize = 1 << 20;
+
+const fn default_inbox_capacity() -> usize {
+    DEFAULT_INBOX_CAPACITY
+}
+
+#[derive(Debug, Deserialize, Serialize, ConfigEnv)]
 pub struct ShardingConfig {
     #[serde(default)]
     #[config_env(leaf)]
     pub cpu_allocation: CpuAllocation,
+    /// Per-shard inter-shard inbox channel capacity. Bounded by design.
+    /// Drops on full inbox of consensus frames are recovered by VSR
+    /// retransmit. Drops of cross-shard client Reply frames are terminal:
+    /// the client never receives the reply (no in-protocol retransmit).
+    /// Both frame classes share this one channel, so a consensus burst
+    /// can starve client-reply forwards: size against the worst-case sum
+    /// of consensus working set + peak client-reply fan-out per shard
+    /// occurring together; see `DEFAULT_INBOX_CAPACITY` for the
+    /// rationale. Used by `core/server-ng`; the legacy server uses its
+    /// own hard-coded inbox sizing.
+    #[serde(default = "default_inbox_capacity")]
+    pub inbox_capacity: usize,

Review Comment:
   Metrics-first to size the split makes sense. Thanks for the TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to