jayakasadev commented on code in PR #3269:
URL: https://github.com/apache/iggy/pull/3269#discussion_r3261790565


##########
core/journal/src/prepare_journal.rs:
##########
@@ -143,27 +186,40 @@ impl PrepareJournal {
         while pos + HEADER_SIZE as u64 <= file_len {
             // Read the 256-byte header
             header_buf = storage.read_at(pos, header_buf).await?;
-            let header: PrepareHeader =
-                *bytemuck::checked::from_bytes::<PrepareHeader>(&header_buf);
+            // `try_from_bytes` so an invalid bit pattern (e.g. a corrupt
+            // `command` byte yielding no `Command2` variant) routes through
+            // `truncate_or_fail` instead of panicking the recovery thread.
+            let Ok(header_ref) = 
bytemuck::checked::try_from_bytes::<PrepareHeader>(&header_buf)
+            else {
+                truncate_or_fail(&storage, pos, "corrupt header (invalid bit 
pattern)").await?;
+                break;
+            };
+            let header: PrepareHeader = *header_ref;
 
             // Validate: must be a Prepare command with sane size
             if header.command != Command2::Prepare
                 || (header.size as usize) < HEADER_SIZE
                 || u64::from(header.size) > MAX_ENTRY_SIZE
             {
-                // Corrupt or non-prepare entry, truncate here
-                storage.truncate(pos).await?;
+                truncate_or_fail(&storage, pos, "corrupt or non-prepare 
entry").await?;
                 break;
             }
 
             let entry_size = u64::from(header.size);
 
+            // TODO(hubcio): verify `header.checksum` / `header.checksum_body`

Review Comment:
   Is there a tracking issue for this? A body bit-flip that leaves the header 
structurally valid will pass the scan undetected and be replayed as committed 
state with no error.



##########
core/message_bus/src/installer/replica.rs:
##########
@@ -266,6 +267,12 @@ pub fn install_replica_conn<C: TransportConn>(
     ) {
         Ok(token) => {
             install_token.set(Some(token));
+            // Stamp this shard as the owner so every shard's
+            // `send_to_replica` slow path can route to us immediately. The
+            // matching CAS-clear runs from
+            // `IggyMessageBus::notify_connection_lost` on either of the
+            // post-loop guards firing.
+            bus.mark_replica_owned(peer_id);

Review Comment:
   There's a gap between `delegate_replica` returning `Ok(target)` in the 
coordinator and this line executing on the target shard. During that window, 
`send_to_replica` for this replica id reads `OWNER_NONE` and drops the frame 
silently. In practice this is likely safe — the replica has to complete login 
and session registration before any VSR message is addressed to it, so this 
stamp should fire well before then. Is that the assumed invariant?



##########
core/configs/src/server_config/sharding.rs:
##########
@@ -21,11 +21,69 @@ use std::str::FromStr;
 
 use configs::ConfigEnv;
 
-#[derive(Debug, Deserialize, Serialize, Default, ConfigEnv)]
+/// Default capacity of the per-shard inter-shard inbox channel. Sized
+/// comfortably above the consensus working set, which is roughly
+/// `PIPELINE_PREPARE_QUEUE_MAX (= 8) * replica_count * directions`
+/// frames in flight per shard, without allowing a runaway producer to
+/// eat unbounded memory. Tunable via `[system.sharding] inbox_capacity`
+/// in TOML.
+///
+/// The capacity must also absorb the worst-case cross-shard client
+/// Reply burst. Unlike consensus frames, client Replies have no VSR
+/// retransmit path: a Reply lost on full inbox is gone and the client
+/// times out. A reasonable lower bound is
+/// `max_inflight_client_requests / num_shards` (assuming requests are
+/// distributed evenly across owning shards) plus the consensus
+/// headroom above.
+///
+/// Consensus frames and client-reply forwards share this one channel,
+/// so the two headrooms are not independent: a consensus burst or
+/// retransmit storm can fill the inbox with consensus frames exactly
+/// when a client Reply needs the space. A single `inbox_capacity` knob
+/// cannot isolate the two frame classes - size it for the sum of both
+/// worst cases occurring together. Watch the drop-site `tracing` logs
+/// (and, once a per-shard exporter lands, the `frame_drops_total`
+/// `{variant="forward_client_send"}` counter) to detect when the bound
+/// is too low in production.
+pub const DEFAULT_INBOX_CAPACITY: usize = 1024;
+
+/// Maximum permitted per-shard inbox depth. The channel is allocated
+/// up-front per shard, so a runaway value here OOMs the process at boot.
+/// `1 << 20` (~1M frames) is several orders of magnitude above any
+/// realistic backpressure target and still fits comfortably in process
+/// address space.
+pub const INBOX_CAPACITY_MAX: usize = 1 << 20;
+
+const fn default_inbox_capacity() -> usize {
+    DEFAULT_INBOX_CAPACITY
+}
+
+#[derive(Debug, Deserialize, Serialize, ConfigEnv)]
 pub struct ShardingConfig {
     #[serde(default)]
     #[config_env(leaf)]
     pub cpu_allocation: CpuAllocation,
+    /// Per-shard inter-shard inbox channel capacity. Bounded by design.
+    /// Drops on full inbox of consensus frames are recovered by VSR
+    /// retransmit. Drops of cross-shard client Reply frames are terminal:
+    /// the client never receives the reply (no in-protocol retransmit).
+    /// Both frame classes share this one channel, so a consensus burst
+    /// can starve client-reply forwards: size against the worst-case sum
+    /// of consensus working set + peak client-reply fan-out per shard
+    /// occurring together; see `DEFAULT_INBOX_CAPACITY` for the
+    /// rationale. Used by `core/server-ng`; the legacy server uses its
+    /// own hard-coded inbox sizing.
+    #[serde(default = "default_inbox_capacity")]
+    pub inbox_capacity: usize,

Review Comment:
   Consensus frames and client-reply forwards share this channel with no 
prioritization, so a VSR retransmit storm can starve client replies — the doc 
acknowledges this. Is there a tracking issue for separate priority lanes, or is 
that intentionally out of scope for now?



##########
core/journal/src/prepare_journal.rs:
##########
@@ -387,6 +437,18 @@ impl Journal<FileStorage> for PrepareJournal {
         // Reopen the file descriptor at the same path.
         self.storage.reopen().await?;
 
+        // Advance the snapshot watermark only AFTER the WAL rewrite is
+        // durable (tmp create -> write -> fsync -> rename -> fsync parent
+        // -> reopen). Advancing earlier would leave `snapshot_op` past
+        // entries still present on disk on any `?` failure above, letting
+        // a future `append()` pass the slot collision check at
+        // `existing.op <= snapshot_op` and silently evict a live entry
+        // from the index. The entry would survive on disk but become
+        // unreachable, stalling `RetransmitPrepares` until view change.
+        if end_op > self.snapshot_op.get() {
+            self.snapshot_op.set(end_op);

Review Comment:
   This is a correctness fix, not just a refactor. Previously `snapshot_op` 
advanced before the compacted WAL was durably renamed into place. A crash 
between those two points would leave the watermark past entries still in the 
old file — on the next boot, `append()` would see those ops as below the 
snapshot watermark, pass the slot-collision check at `existing.op <= 
snapshot_op`, and silently drop live index entries, making those prepares 
permanently unreachable to `RetransmitPrepares`.



##########
core/server-ng/src/bootstrap.rs:
##########
@@ -42,52 +41,68 @@ use message_bus::transports::tls::{
 };
 use message_bus::{
     AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, 
AcceptedTlsClientFn,
-    AcceptedWsClientFn, IggyMessageBus, connector,
+    AcceptedWsClientFn, IggyMessageBus, ReplicaOwnerTable, connector,
 };
 use metadata::IggyMetadata;
 use metadata::MuxStateMachine;
 use metadata::impls::metadata::{IggySnapshot, StreamsFrontend};
 use metadata::impls::recovery::recover;
 use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::mux::WithFactory;
 use metadata::stm::snapshot::Snapshot;
 use metadata::stm::stream::{Partition, Streams};
 use metadata::stm::user::Users;
 use partitions::{
     IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, 
PartitionsConfig, Segment,
 };
 // TODO: decouple bootstrap/storage helpers and logging from the `server` 
crate.
-use server::bootstrap::create_directories;
+use server::bootstrap::{create_directories, create_shard_executor};
 use server::log::logger::Logging;
+use server::shard_allocator::{ShardAllocator, ShardInfo};
 use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
 use server::streaming::segments::storage::create_segment_storage;
 use shard::builder::IggyShardBuilder;
-use shard::shards_table::PapayaShardsTable;
+use shard::metrics::ShardMetrics;
+use shard::shards_table::{PapayaShardsTable, calculate_shard_assignment};
 use shard::{
-    CoordinatorConfig, IggyShard, PartitionConsensusConfig, ShardIdentity, 
channel, shard_channel,
+    CoordinatorConfig, IggyShard, PartitionConsensusConfig, Receiver as 
ShardReceiver, ShardFrame,
+    ShardIdentity, TaggedSender, channel, shard_mesh_channels,
 };
-use std::cell::{Cell, RefCell};
-use std::future::Future;
+use std::cell::RefCell;
 use std::net::{IpAddr, SocketAddr};
 use std::path::{Path, PathBuf};
 use std::rc::{Rc, Weak};
 use std::sync::Arc;
-use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::thread;
+use std::time::Duration;
 use tracing::{error, info, warn};
 
 const CLUSTER_ID: u128 = 1;
-const SHARD_ID: u16 = 0;
 const SHARD_REPLICA_ID: u8 = 0;

Review Comment:
   Confirming my read: this is the single-node fallback — in cluster mode 
`current_replica_id` from the CLI is used instead, and all shards on the same 
node share the same replica id because shards are intra-node partition 
assignments, not VSR replicas. Is that the intended model?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to