This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 11295b3fe feat(shard): Implement shard router (#2853)
11295b3fe is described below

commit 11295b3fef66183ca88e01bef4a6660cb37839a6
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed Mar 4 22:31:43 2026 +0530

    feat(shard): Implement shard router (#2853)
---
 Cargo.lock                                |  19 +++-
 Cargo.toml                                |   1 +
 DEPENDENCIES.md                           |   1 +
 core/common/src/types/consensus/header.rs |  40 +++++++
 core/metadata/src/impls/metadata.rs       |  28 +----
 core/partitions/src/iggy_partitions.rs    |   7 +-
 core/shard/Cargo.toml                     |   5 +
 core/shard/src/lib.rs                     | 125 ++++++++++++++++-----
 core/shard/src/router.rs                  | 176 ++++++++++++++++++++++++++++++
 core/shard/src/shards_table.rs            |  93 ++++++++++++++++
 core/simulator/src/replica.rs             |   6 +-
 11 files changed, 442 insertions(+), 59 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index bbd78bdf8..e54c38866 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2420,6 +2420,18 @@ version = "0.8.21"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
 
+[[package]]
+name = "crossfire"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3fb12e9c05ae4854f743f0acec2f817148ba59a902484f6aa298d4fc7df2fac4"
+dependencies = [
+ "crossbeam-utils",
+ "futures-core",
+ "parking_lot",
+ "smallvec",
+]
+
 [[package]]
 name = "crossterm"
 version = "0.29.0"
@@ -8727,7 +8739,7 @@ dependencies = [
  "errno",
  "libc",
  "linux-raw-sys 0.4.15",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -9362,11 +9374,16 @@ name = "shard"
 version = "0.1.0"
 dependencies = [
  "consensus",
+ "crossfire",
+ "futures",
+ "hash32 1.0.0",
  "iggy_common",
  "journal",
  "message_bus",
  "metadata",
+ "papaya",
  "partitions",
+ "tracing",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index dc2bb6641..3be3da952 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -127,6 +127,7 @@ configs_derive = { path = "core/configs_derive", version = 
"0.1.0" }
 consensus = { path = "core/consensus" }
 console-subscriber = "0.5.0"
 crossbeam = "0.8.4"
+crossfire = "3.1.4"
 ctor = "0.6.3"
 ctrlc = { version = "3.5", features = ["termination"] }
 cucumber = "0.22"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index acb2ba8c5..6d7c7edb7 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -210,6 +210,7 @@ crossbeam-deque: 0.8.6, "Apache-2.0 OR MIT",
 crossbeam-epoch: 0.9.18, "Apache-2.0 OR MIT",
 crossbeam-queue: 0.3.12, "Apache-2.0 OR MIT",
 crossbeam-utils: 0.8.21, "Apache-2.0 OR MIT",
+crossfire: 3.1.4, "Apache-2.0",
 crossterm: 0.29.0, "MIT",
 crossterm_winapi: 0.9.1, "MIT",
 crunchy: 0.2.4, "MIT",
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index d8a005700..c096fa44f 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -124,6 +124,46 @@ pub enum Operation {
     Reserved = 200,
 }
 
+impl Operation {
+    /// Returns `true` for metadata / control-plane operations (streams, 
topics,
+    /// users, consumer groups, etc.) that are always handled by shard 0.
+    #[inline]
+    pub fn is_metadata(&self) -> bool {
+        matches!(
+            self,
+            Operation::CreateStream
+                | Operation::UpdateStream
+                | Operation::DeleteStream
+                | Operation::PurgeStream
+                | Operation::CreateTopic
+                | Operation::UpdateTopic
+                | Operation::DeleteTopic
+                | Operation::PurgeTopic
+                | Operation::CreatePartitions
+                | Operation::DeletePartitions
+                | Operation::CreateConsumerGroup
+                | Operation::DeleteConsumerGroup
+                | Operation::CreateUser
+                | Operation::UpdateUser
+                | Operation::DeleteUser
+                | Operation::ChangePassword
+                | Operation::UpdatePermissions
+                | Operation::CreatePersonalAccessToken
+                | Operation::DeletePersonalAccessToken
+        )
+    }
+
+    /// Returns `true` for data-plane operations that are routed to the shard
+    /// owning the partition identified by the message's namespace.
+    #[inline]
+    pub fn is_partition(&self) -> bool {
+        matches!(
+            self,
+            Operation::SendMessages | Operation::StoreConsumerOffset | 
Operation::DeleteSegments
+        )
+    }
+}
+
 #[repr(C)]
 #[derive(Debug, Clone, Copy)]
 pub struct GenericHeader {
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index e7ce25e83..f5502ce06 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -25,8 +25,7 @@ use consensus::{
 };
 use iggy_common::{
     header::{
-        Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader,
-        RequestHeader,
+        Command2, ConsensusHeader, GenericHeader, PrepareHeader, 
PrepareOkHeader, RequestHeader,
     },
     message::Message,
 };
@@ -260,30 +259,7 @@ where
             message.header().command(),
             Command2::Request | Command2::Prepare | Command2::PrepareOk
         ));
-        let operation = message.header().operation();
-        // TODO: Use better selection, smth like greater or equal based on op 
number.
-        matches!(
-            operation,
-            Operation::CreateStream
-                | Operation::UpdateStream
-                | Operation::DeleteStream
-                | Operation::PurgeStream
-                | Operation::CreateTopic
-                | Operation::UpdateTopic
-                | Operation::DeleteTopic
-                | Operation::PurgeTopic
-                | Operation::CreatePartitions
-                | Operation::DeletePartitions
-                | Operation::CreateConsumerGroup
-                | Operation::DeleteConsumerGroup
-                | Operation::CreateUser
-                | Operation::UpdateUser
-                | Operation::DeleteUser
-                | Operation::ChangePassword
-                | Operation::UpdatePermissions
-                | Operation::CreatePersonalAccessToken
-                | Operation::DeletePersonalAccessToken
-        )
+        message.header().operation().is_metadata()
     }
 }
 
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 6c1ae865c..3ae5d4389 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -514,12 +514,7 @@ where
             message.header().command(),
             Command2::Request | Command2::Prepare | Command2::PrepareOk
         ));
-        let operation = message.header().operation();
-        // TODO: Use better selection, smth like greater or equal based on op 
number.
-        matches!(
-            operation,
-            Operation::DeleteSegments | Operation::SendMessages | 
Operation::StoreConsumerOffset
-        )
+        message.header().operation().is_partition()
     }
 }
 
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index c5b7621fd..f3f9d7e29 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -22,8 +22,13 @@ edition = "2024"
 
 [dependencies]
 consensus = { path = "../consensus" }
+crossfire = { workspace = true }
+futures = { workspace = true }
+hash32 = { workspace = true }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
 metadata = { path = "../metadata" }
+papaya = { workspace = true }
 partitions = { path = "../partitions" }
+tracing = { workspace = true }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 35048fc00..3e6adc00f 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use consensus::{
-    MetadataHandle, MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, 
PlaneIdentity,
-    VsrConsensus,
-};
+mod router;
+pub mod shards_table;
+
+use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, 
VsrConsensus};
 use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader};
 use iggy_common::message::{Message, MessageBag};
 use iggy_common::sharding::IggyNamespace;
@@ -28,6 +28,7 @@ use message_bus::MessageBus;
 use metadata::IggyMetadata;
 use metadata::stm::StateMachine;
 use partitions::IggyPartitions;
+use shards_table::ShardsTable;
 
 pub type ShardPlane<B, J, S, M> = MuxPlane<
     variadic!(
@@ -36,30 +37,116 @@ pub type ShardPlane<B, J, S, M> = MuxPlane<
     ),
 >;
 
-pub struct IggyShard<B, J, S, M>
+/// Bounded mpsc channel sender (blocking send).
+pub type Sender<T> = crossfire::MTx<crossfire::mpsc::Array<T>>;
+
+/// Bounded mpsc channel receiver (async recv).
+pub type Receiver<T> = crossfire::AsyncRx<crossfire::mpsc::Array<T>>;
+
+/// Create a bounded mpsc channel with a blocking sender and async receiver.
+pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) 
{
+    crossfire::mpsc::bounded_blocking_async(capacity)
+}
+
+/// Envelope for inter-shard channel messages.
+///
+/// Wraps a consensus [`Message`] together with an optional one-shot response
+/// channel.  Fire-and-forget dispatches leave `response_sender` as `None`;
+/// request-response dispatches provide a sender that the message pump will
+/// notify once the message has been processed.
+///
+/// The response type `R` is generic so that higher layers (e.g. HTTP handlers)
+/// can carry a response enum while the consensus layer can default to `()`.
+pub struct ShardFrame<R: Send + 'static = ()> {
+    pub message: Message<GenericHeader>,
+    pub response_sender: Option<Sender<R>>,
+}
+
+impl<R: Send + 'static> ShardFrame<R> {
+    /// Create a fire-and-forget frame (no caller waiting for completion).
+    pub fn fire_and_forget(message: Message<GenericHeader>) -> Self {
+        Self {
+            message,
+            response_sender: None,
+        }
+    }
+
+    /// Create a request-response frame.  Returns the frame and a receiver
+    /// that the caller can await for completion notification.
+    pub fn with_response(message: Message<GenericHeader>) -> (Self, 
Receiver<R>) {
+        let (tx, rx) = channel(1);
+        (
+            Self {
+                message,
+                response_sender: Some(tx),
+            },
+            rx,
+        )
+    }
+}
+
+pub struct IggyShard<B, J, S, M, T = (), R: Send + 'static = ()>
 where
     B: MessageBus,
 {
-    pub id: u8,
+    pub id: u16,
     pub name: String,
     pub plane: ShardPlane<B, J, S, M>,
+
+    /// Channel senders to every shard, indexed by shard id.
+    /// Includes a sender to self so that local routing goes through the
+    /// same channel path as remote routing.
+    senders: Vec<Sender<ShardFrame<R>>>,
+
+    /// Receiver end of this shard's inbox.  Peer shards (and self) send
+    /// messages here via the corresponding sender.
+    inbox: Receiver<ShardFrame<R>>,
+
+    /// Partition namespace -> owning shard lookup.
+    shards_table: T,
 }
 
-impl<B, J, S, M> IggyShard<B, J, S, M>
+impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
 where
     B: MessageBus,
+    T: ShardsTable,
 {
-    /// Create a new shard from pre-built metadata and partition planes.
+    /// Create a new shard with channel links and a shards table.
+    ///
+    /// * `senders` - one sender per shard in the cluster (indexed by shard 
id).
+    /// * `inbox` - the receiver that this shard drains in its message pump.
+    /// * `shards_table` - namespace -> shard routing table.
     pub fn new(
-        id: u8,
+        id: u16,
         name: String,
         metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
         partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+        senders: Vec<Sender<ShardFrame<R>>>,
+        inbox: Receiver<ShardFrame<R>>,
+        shards_table: T,
     ) -> Self {
         let plane = MuxPlane::new(variadic!(metadata, partitions));
-        Self { id, name, plane }
+        Self {
+            id,
+            name,
+            plane,
+            senders,
+            inbox,
+            shards_table,
+        }
     }
 
+    pub fn shards_table(&self) -> &T {
+        &self.shards_table
+    }
+}
+
+/// Local message processing — these methods handle messages that have been
+/// routed to this shard via the message pump.
+impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
+where
+    B: MessageBus,
+{
     /// Dispatch an incoming network message to the appropriate consensus 
plane.
     ///
     /// Routes requests, replication messages, and acks to either the metadata
@@ -93,11 +180,7 @@ where
             >,
         M: StateMachine<Input = Message<PrepareHeader>>,
     {
-        if self.plane.metadata().is_applicable(&request) {
-            self.plane.metadata().on_request(request).await;
-        } else {
-            self.plane.partitions().on_request(request).await;
-        }
+        self.plane.on_request(request).await;
     }
 
     pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
@@ -111,11 +194,7 @@ where
             >,
         M: StateMachine<Input = Message<PrepareHeader>>,
     {
-        if self.plane.metadata().is_applicable(&prepare) {
-            self.plane.metadata().on_replicate(prepare).await;
-        } else {
-            self.plane.partitions().on_replicate(prepare).await;
-        }
+        self.plane.on_replicate(prepare).await;
     }
 
     pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
@@ -129,11 +208,7 @@ where
             >,
         M: StateMachine<Input = Message<PrepareHeader>>,
     {
-        if self.plane.metadata().is_applicable(&prepare_ok) {
-            self.plane.metadata().on_ack(prepare_ok).await;
-        } else {
-            self.plane.partitions().on_ack(prepare_ok).await;
-        }
+        self.plane.on_ack(prepare_ok).await;
     }
 
     /// Drain and dispatch loopback messages for each consensus plane.
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
new file mode 100644
index 000000000..a8b6d5637
--- /dev/null
+++ b/core/shard/src/router.rs
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::shards_table::ShardsTable;
+use crate::{IggyShard, Receiver, ShardFrame};
+use futures::FutureExt;
+use iggy_common::header::{GenericHeader, PrepareHeader};
+use iggy_common::message::{Message, MessageBag};
+use iggy_common::sharding::IggyNamespace;
+use journal::{Journal, JournalHandle};
+use message_bus::MessageBus;
+use metadata::stm::StateMachine;
+
+/// Inter-shard dispatch logic.
+///
+/// All messages — whether destined for a local or remote shard — are routed
+/// through the channel into the target shard's message pump.  This ensures
+/// that every mutation on a shard is serialized through a single point (the
+/// pump), preventing concurrent access from independent async tasks.
+impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R>
+where
+    B: MessageBus,
+    T: ShardsTable,
+    R: Send + 'static,
+{
+    /// Classify a raw network message and route it to
+    /// the correct shard's message pump.
+    ///
+    /// Decomposes the generic message into its typed form (Request, Prepare,
+    /// or PrepareOk) to access the operation and namespace, then resolves
+    /// the target shard and enqueues the message via its channel sender.
+    pub fn dispatch(&self, message: Message<GenericHeader>) {
+        let (operation, namespace, generic) = match MessageBag::from(message) {
+            MessageBag::Request(ref r) => {
+                let h = r.header();
+                (h.operation, h.namespace, r.as_generic().clone())
+            }
+            MessageBag::Prepare(ref p) => {
+                let h = p.header();
+                (h.operation, h.namespace, p.as_generic().clone())
+            }
+            MessageBag::PrepareOk(ref p) => {
+                let h = p.header();
+                (h.operation, h.namespace, p.as_generic().clone())
+            }
+        };
+        let namespace = IggyNamespace::from_raw(namespace);
+        let target = if operation.is_metadata() {
+            0
+        } else if operation.is_partition() {
+            self.shards_table.shard_for(namespace).unwrap_or_else(|| {
+                tracing::warn!(
+                    shard = self.id,
+                    stream = namespace.stream_id(),
+                    topic = namespace.topic_id(),
+                    partition = namespace.partition_id(),
+                    "namespace not found in shards_table, falling back to 
shard 0"
+                );
+                0
+            })
+        } else {
+            0
+        };
+        let _ = self.senders[target as 
usize].send(ShardFrame::fire_and_forget(generic));
+    }
+
+    /// Dispatch a message and return a receiver that resolves when the target
+    /// shard has finished processing it.
+    pub fn dispatch_request(&self, message: Message<GenericHeader>) -> 
Receiver<R> {
+        let (operation, namespace, generic) = match MessageBag::from(message) {
+            MessageBag::Request(ref r) => {
+                let h = r.header();
+                (h.operation, h.namespace, r.as_generic().clone())
+            }
+            MessageBag::Prepare(ref p) => {
+                let h = p.header();
+                (h.operation, h.namespace, p.as_generic().clone())
+            }
+            MessageBag::PrepareOk(ref p) => {
+                let h = p.header();
+                (h.operation, h.namespace, p.as_generic().clone())
+            }
+        };
+        let namespace = IggyNamespace::from_raw(namespace);
+
+        // Determine which shard should handle a message given its operation 
and
+        // namespace.
+        //
+        // - Metadata operations always route to shard 0 (the control plane).
+        // - Partition operations route to the shard that owns the namespace,
+        //   looked up via the [`ShardsTable`].
+        // - Unknown operations fall back to shard 0.
+        let target = if operation.is_metadata() {
+            0
+        } else if operation.is_partition() {
+            self.shards_table.shard_for(namespace).unwrap_or_else(|| {
+                tracing::warn!(
+                    shard = self.id,
+                    stream = namespace.stream_id(),
+                    topic = namespace.topic_id(),
+                    partition = namespace.partition_id(),
+                    "namespace not found in shards_table, falling back to 
shard 0"
+                );
+                0
+            })
+        } else {
+            0
+        };
+        // Create a frame and send it to the target shard.
+        let (frame, rx) = ShardFrame::<R>::with_response(generic);
+        let _ = self.senders[target as usize].send(frame);
+        rx
+    }
+
+    /// Drain this shard's inbox and process each frame locally.
+    pub async fn run_message_pump(&self, stop: Receiver<()>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        J: JournalHandle,
+        <J as JournalHandle>::Target: Journal<
+                <J as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<Input = Message<PrepareHeader>>,
+    {
+        loop {
+            futures::select! {
+                _ = stop.recv().fuse() => break,
+                frame = self.inbox.recv().fuse() => {
+                    match frame {
+                        Ok(frame) => self.process_frame(frame).await,
+                        Err(_) => break,
+                    }
+                }
+            }
+        }
+
+        // Drain remaining frames so in-flight requests get a response.
+        while let Ok(frame) = self.inbox.try_recv() {
+            self.process_frame(frame).await;
+        }
+    }
+
+    async fn process_frame(&self, frame: ShardFrame<R>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        J: JournalHandle,
+        <J as JournalHandle>::Target: Journal<
+                <J as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<Input = Message<PrepareHeader>>,
+    {
+        self.on_message(frame.message).await;
+        // TODO: once on_message returns an R (e.g. ShardResponse), send it
+        // back via frame.response_sender.  For now the sender is dropped and
+        // the caller's receiver will observe a disconnect.
+        drop(frame.response_sender);
+    }
+}
diff --git a/core/shard/src/shards_table.rs b/core/shard/src/shards_table.rs
new file mode 100644
index 000000000..719f52e10
--- /dev/null
+++ b/core/shard/src/shards_table.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use hash32::{Hasher, Murmur3Hasher};
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
+use std::hash::Hasher as _;
+
+/// Lookup table that maps partition namespaces to their owning shard.
+///
+/// Implementations can be:
+/// - A shared concurrent map (DashMap, papaya, etc.) referenced by all shards.
+/// - A per-shard local `HashMap` replica, updated via
+///   broadcast when partitions are created, deleted, or moved.
+pub trait ShardsTable {
+    /// Returns the shard id that owns `namespace`, or `None` if the
+    /// namespace is not yet registered (partition not created or update
+    /// hasn't propagated).
+    fn shard_for(&self, namespace: IggyNamespace) -> Option<u16>;
+}
+
+/// No-op shards table for single-shard setups.
+impl ShardsTable for () {
+    fn shard_for(&self, _namespace: IggyNamespace) -> Option<u16> {
+        None
+    }
+}
+
+/// Lock-free shards table backed by [`papaya::HashMap`].
+pub struct PapayaShardsTable {
+    inner: papaya::HashMap<IggyNamespace, PartitionLocation>,
+}
+
+impl Default for PapayaShardsTable {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl PapayaShardsTable {
+    pub fn new() -> Self {
+        Self {
+            inner: papaya::HashMap::new(),
+        }
+    }
+
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            inner: papaya::HashMap::with_capacity(capacity),
+        }
+    }
+
+    pub fn insert(&self, namespace: IggyNamespace, location: 
PartitionLocation) {
+        self.inner.pin().insert(namespace, location);
+    }
+
+    pub fn remove(&self, namespace: &IggyNamespace) -> 
Option<PartitionLocation> {
+        let guard = self.inner.guard();
+        self.inner.remove(namespace, &guard).copied()
+    }
+}
+
+impl ShardsTable for PapayaShardsTable {
+    fn shard_for(&self, namespace: IggyNamespace) -> Option<u16> {
+        let guard = self.inner.guard();
+        self.inner.get(&namespace, &guard).map(|loc| *loc.shard_id)
+    }
+}
+
+/// Deterministic partition-to-shard assignment using Murmur3 hash.
+///
+/// Given a packed `IggyNamespace` and the total number of shards, returns the
+/// shard id that should own the partition.  The upper bits of the Murmur3 hash
+/// are used to avoid the weak lower bits for small integer inputs.
+pub fn calculate_shard_assignment(ns: &IggyNamespace, shard_count: u32) -> u16 
{
+    let mut hasher = Murmur3Hasher::default();
+    hasher.write_u64(ns.inner());
+    let hash = hasher.finish32();
+    ((hash >> 16) % shard_count) as u16
+}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 7b3d9ee7f..ddd423f11 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -82,5 +82,9 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, 
replica_count: u8) ->
     partition_consensus.init();
     partitions.set_consensus(partition_consensus);
 
-    shard::IggyShard::new(id, name, metadata, partitions)
+    // TODO: previously we used used unbounded channel with flume,
+    // but this is not possible with crossfire without mangling types due to 
Flavor trait in crossfire.
+    // This needs to be revisited in the future.
+    let (_tx, inbox) = shard::channel(1024);
+    shard::IggyShard::new(id as u16, name, metadata, partitions, Vec::new(), 
inbox, ())
 }

Reply via email to