This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 11295b3fe feat(shard): Implement shard router (#2853)
11295b3fe is described below
commit 11295b3fef66183ca88e01bef4a6660cb37839a6
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed Mar 4 22:31:43 2026 +0530
feat(shard): Implement shard router (#2853)
---
Cargo.lock | 19 +++-
Cargo.toml | 1 +
DEPENDENCIES.md | 1 +
core/common/src/types/consensus/header.rs | 40 +++++++
core/metadata/src/impls/metadata.rs | 28 +----
core/partitions/src/iggy_partitions.rs | 7 +-
core/shard/Cargo.toml | 5 +
core/shard/src/lib.rs | 125 ++++++++++++++++-----
core/shard/src/router.rs | 176 ++++++++++++++++++++++++++++++
core/shard/src/shards_table.rs | 93 ++++++++++++++++
core/simulator/src/replica.rs | 6 +-
11 files changed, 442 insertions(+), 59 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index bbd78bdf8..e54c38866 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2420,6 +2420,18 @@ version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
+[[package]]
+name = "crossfire"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3fb12e9c05ae4854f743f0acec2f817148ba59a902484f6aa298d4fc7df2fac4"
+dependencies = [
+ "crossbeam-utils",
+ "futures-core",
+ "parking_lot",
+ "smallvec",
+]
+
[[package]]
name = "crossterm"
version = "0.29.0"
@@ -8727,7 +8739,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys 0.4.15",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -9362,11 +9374,16 @@ name = "shard"
version = "0.1.0"
dependencies = [
"consensus",
+ "crossfire",
+ "futures",
+ "hash32 1.0.0",
"iggy_common",
"journal",
"message_bus",
"metadata",
+ "papaya",
"partitions",
+ "tracing",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index dc2bb6641..3be3da952 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -127,6 +127,7 @@ configs_derive = { path = "core/configs_derive", version =
"0.1.0" }
consensus = { path = "core/consensus" }
console-subscriber = "0.5.0"
crossbeam = "0.8.4"
+crossfire = "3.1.4"
ctor = "0.6.3"
ctrlc = { version = "3.5", features = ["termination"] }
cucumber = "0.22"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index acb2ba8c5..6d7c7edb7 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -210,6 +210,7 @@ crossbeam-deque: 0.8.6, "Apache-2.0 OR MIT",
crossbeam-epoch: 0.9.18, "Apache-2.0 OR MIT",
crossbeam-queue: 0.3.12, "Apache-2.0 OR MIT",
crossbeam-utils: 0.8.21, "Apache-2.0 OR MIT",
+crossfire: 3.1.4, "Apache-2.0",
crossterm: 0.29.0, "MIT",
crossterm_winapi: 0.9.1, "MIT",
crunchy: 0.2.4, "MIT",
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
index d8a005700..c096fa44f 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -124,6 +124,46 @@ pub enum Operation {
Reserved = 200,
}
+impl Operation {
+ /// Returns `true` for metadata / control-plane operations (streams,
topics,
+ /// users, consumer groups, etc.) that are always handled by shard 0.
+ #[inline]
+ pub fn is_metadata(&self) -> bool {
+ matches!(
+ self,
+ Operation::CreateStream
+ | Operation::UpdateStream
+ | Operation::DeleteStream
+ | Operation::PurgeStream
+ | Operation::CreateTopic
+ | Operation::UpdateTopic
+ | Operation::DeleteTopic
+ | Operation::PurgeTopic
+ | Operation::CreatePartitions
+ | Operation::DeletePartitions
+ | Operation::CreateConsumerGroup
+ | Operation::DeleteConsumerGroup
+ | Operation::CreateUser
+ | Operation::UpdateUser
+ | Operation::DeleteUser
+ | Operation::ChangePassword
+ | Operation::UpdatePermissions
+ | Operation::CreatePersonalAccessToken
+ | Operation::DeletePersonalAccessToken
+ )
+ }
+
+ /// Returns `true` for data-plane operations that are routed to the shard
+ /// owning the partition identified by the message's namespace.
+ #[inline]
+ pub fn is_partition(&self) -> bool {
+ matches!(
+ self,
+ Operation::SendMessages | Operation::StoreConsumerOffset |
Operation::DeleteSegments
+ )
+ }
+}
+
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct GenericHeader {
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index e7ce25e83..f5502ce06 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -25,8 +25,7 @@ use consensus::{
};
use iggy_common::{
header::{
- Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader,
PrepareOkHeader,
- RequestHeader,
+ Command2, ConsensusHeader, GenericHeader, PrepareHeader,
PrepareOkHeader, RequestHeader,
},
message::Message,
};
@@ -260,30 +259,7 @@ where
message.header().command(),
Command2::Request | Command2::Prepare | Command2::PrepareOk
));
- let operation = message.header().operation();
- // TODO: Use better selection, smth like greater or equal based on op
number.
- matches!(
- operation,
- Operation::CreateStream
- | Operation::UpdateStream
- | Operation::DeleteStream
- | Operation::PurgeStream
- | Operation::CreateTopic
- | Operation::UpdateTopic
- | Operation::DeleteTopic
- | Operation::PurgeTopic
- | Operation::CreatePartitions
- | Operation::DeletePartitions
- | Operation::CreateConsumerGroup
- | Operation::DeleteConsumerGroup
- | Operation::CreateUser
- | Operation::UpdateUser
- | Operation::DeleteUser
- | Operation::ChangePassword
- | Operation::UpdatePermissions
- | Operation::CreatePersonalAccessToken
- | Operation::DeletePersonalAccessToken
- )
+ message.header().operation().is_metadata()
}
}
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 6c1ae865c..3ae5d4389 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -514,12 +514,7 @@ where
message.header().command(),
Command2::Request | Command2::Prepare | Command2::PrepareOk
));
- let operation = message.header().operation();
- // TODO: Use better selection, smth like greater or equal based on op
number.
- matches!(
- operation,
- Operation::DeleteSegments | Operation::SendMessages |
Operation::StoreConsumerOffset
- )
+ message.header().operation().is_partition()
}
}
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index c5b7621fd..f3f9d7e29 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -22,8 +22,13 @@ edition = "2024"
[dependencies]
consensus = { path = "../consensus" }
+crossfire = { workspace = true }
+futures = { workspace = true }
+hash32 = { workspace = true }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
metadata = { path = "../metadata" }
+papaya = { workspace = true }
partitions = { path = "../partitions" }
+tracing = { workspace = true }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 35048fc00..3e6adc00f 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use consensus::{
- MetadataHandle, MuxPlane, NamespacedPipeline, PartitionsHandle, Plane,
PlaneIdentity,
- VsrConsensus,
-};
+mod router;
+pub mod shards_table;
+
+use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane,
VsrConsensus};
use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader,
RequestHeader};
use iggy_common::message::{Message, MessageBag};
use iggy_common::sharding::IggyNamespace;
@@ -28,6 +28,7 @@ use message_bus::MessageBus;
use metadata::IggyMetadata;
use metadata::stm::StateMachine;
use partitions::IggyPartitions;
+use shards_table::ShardsTable;
pub type ShardPlane<B, J, S, M> = MuxPlane<
variadic!(
@@ -36,30 +37,116 @@ pub type ShardPlane<B, J, S, M> = MuxPlane<
),
>;
-pub struct IggyShard<B, J, S, M>
+/// Bounded mpsc channel sender (blocking send).
+pub type Sender<T> = crossfire::MTx<crossfire::mpsc::Array<T>>;
+
+/// Bounded mpsc channel receiver (async recv).
+pub type Receiver<T> = crossfire::AsyncRx<crossfire::mpsc::Array<T>>;
+
+/// Create a bounded mpsc channel with a blocking sender and async receiver.
+pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>)
{
+ crossfire::mpsc::bounded_blocking_async(capacity)
+}
+
+/// Envelope for inter-shard channel messages.
+///
+/// Wraps a consensus [`Message`] together with an optional one-shot response
+/// channel. Fire-and-forget dispatches leave `response_sender` as `None`;
+/// request-response dispatches provide a sender that the message pump will
+/// notify once the message has been processed.
+///
+/// The response type `R` is generic so that higher layers (e.g. HTTP handlers)
+/// can carry a response enum while the consensus layer can default to `()`.
+pub struct ShardFrame<R: Send + 'static = ()> {
+ pub message: Message<GenericHeader>,
+ pub response_sender: Option<Sender<R>>,
+}
+
+impl<R: Send + 'static> ShardFrame<R> {
+ /// Create a fire-and-forget frame (no caller waiting for completion).
+ pub fn fire_and_forget(message: Message<GenericHeader>) -> Self {
+ Self {
+ message,
+ response_sender: None,
+ }
+ }
+
+ /// Create a request-response frame. Returns the frame and a receiver
+ /// that the caller can await for completion notification.
+ pub fn with_response(message: Message<GenericHeader>) -> (Self,
Receiver<R>) {
+ let (tx, rx) = channel(1);
+ (
+ Self {
+ message,
+ response_sender: Some(tx),
+ },
+ rx,
+ )
+ }
+}
+
+pub struct IggyShard<B, J, S, M, T = (), R: Send + 'static = ()>
where
B: MessageBus,
{
- pub id: u8,
+ pub id: u16,
pub name: String,
pub plane: ShardPlane<B, J, S, M>,
+
+ /// Channel senders to every shard, indexed by shard id.
+ /// Includes a sender to self so that local routing goes through the
+ /// same channel path as remote routing.
+ senders: Vec<Sender<ShardFrame<R>>>,
+
+ /// Receiver end of this shard's inbox. Peer shards (and self) send
+ /// messages here via the corresponding sender.
+ inbox: Receiver<ShardFrame<R>>,
+
+ /// Partition namespace -> owning shard lookup.
+ shards_table: T,
}
-impl<B, J, S, M> IggyShard<B, J, S, M>
+impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
where
B: MessageBus,
+ T: ShardsTable,
{
- /// Create a new shard from pre-built metadata and partition planes.
+ /// Create a new shard with channel links and a shards table.
+ ///
+ /// * `senders` - one sender per shard in the cluster (indexed by shard
id).
+ /// * `inbox` - the receiver that this shard drains in its message pump.
+ /// * `shards_table` - namespace -> shard routing table.
pub fn new(
- id: u8,
+ id: u16,
name: String,
metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+ senders: Vec<Sender<ShardFrame<R>>>,
+ inbox: Receiver<ShardFrame<R>>,
+ shards_table: T,
) -> Self {
let plane = MuxPlane::new(variadic!(metadata, partitions));
- Self { id, name, plane }
+ Self {
+ id,
+ name,
+ plane,
+ senders,
+ inbox,
+ shards_table,
+ }
}
+ pub fn shards_table(&self) -> &T {
+ &self.shards_table
+ }
+}
+
+/// Local message processing — these methods handle messages that have been
+/// routed to this shard via the message pump.
+impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
+where
+ B: MessageBus,
+{
/// Dispatch an incoming network message to the appropriate consensus
plane.
///
/// Routes requests, replication messages, and acks to either the metadata
@@ -93,11 +180,7 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- if self.plane.metadata().is_applicable(&request) {
- self.plane.metadata().on_request(request).await;
- } else {
- self.plane.partitions().on_request(request).await;
- }
+ self.plane.on_request(request).await;
}
pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
@@ -111,11 +194,7 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- if self.plane.metadata().is_applicable(&prepare) {
- self.plane.metadata().on_replicate(prepare).await;
- } else {
- self.plane.partitions().on_replicate(prepare).await;
- }
+ self.plane.on_replicate(prepare).await;
}
pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
@@ -129,11 +208,7 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- if self.plane.metadata().is_applicable(&prepare_ok) {
- self.plane.metadata().on_ack(prepare_ok).await;
- } else {
- self.plane.partitions().on_ack(prepare_ok).await;
- }
+ self.plane.on_ack(prepare_ok).await;
}
/// Drain and dispatch loopback messages for each consensus plane.
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
new file mode 100644
index 000000000..a8b6d5637
--- /dev/null
+++ b/core/shard/src/router.rs
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::shards_table::ShardsTable;
+use crate::{IggyShard, Receiver, ShardFrame};
+use futures::FutureExt;
+use iggy_common::header::{GenericHeader, PrepareHeader};
+use iggy_common::message::{Message, MessageBag};
+use iggy_common::sharding::IggyNamespace;
+use journal::{Journal, JournalHandle};
+use message_bus::MessageBus;
+use metadata::stm::StateMachine;
+
+/// Inter-shard dispatch logic.
+///
+/// All messages — whether destined for a local or remote shard — are routed
+/// through the channel into the target shard's message pump. This ensures
+/// that every mutation on a shard is serialized through a single point (the
+/// pump), preventing concurrent access from independent async tasks.
+impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R>
+where
+ B: MessageBus,
+ T: ShardsTable,
+ R: Send + 'static,
+{
+ /// Classify a raw network message and route it to
+ /// the correct shard's message pump.
+ ///
+ /// Decomposes the generic message into its typed form (Request, Prepare,
+ /// or PrepareOk) to access the operation and namespace, then resolves
+ /// the target shard and enqueues the message via its channel sender.
+ pub fn dispatch(&self, message: Message<GenericHeader>) {
+ let (operation, namespace, generic) = match MessageBag::from(message) {
+ MessageBag::Request(ref r) => {
+ let h = r.header();
+ (h.operation, h.namespace, r.as_generic().clone())
+ }
+ MessageBag::Prepare(ref p) => {
+ let h = p.header();
+ (h.operation, h.namespace, p.as_generic().clone())
+ }
+ MessageBag::PrepareOk(ref p) => {
+ let h = p.header();
+ (h.operation, h.namespace, p.as_generic().clone())
+ }
+ };
+ let namespace = IggyNamespace::from_raw(namespace);
+ let target = if operation.is_metadata() {
+ 0
+ } else if operation.is_partition() {
+ self.shards_table.shard_for(namespace).unwrap_or_else(|| {
+ tracing::warn!(
+ shard = self.id,
+ stream = namespace.stream_id(),
+ topic = namespace.topic_id(),
+ partition = namespace.partition_id(),
+ "namespace not found in shards_table, falling back to
shard 0"
+ );
+ 0
+ })
+ } else {
+ 0
+ };
+ let _ = self.senders[target as
usize].send(ShardFrame::fire_and_forget(generic));
+ }
+
+ /// Dispatch a message and return a receiver that resolves when the target
+ /// shard has finished processing it.
+ pub fn dispatch_request(&self, message: Message<GenericHeader>) ->
Receiver<R> {
+ let (operation, namespace, generic) = match MessageBag::from(message) {
+ MessageBag::Request(ref r) => {
+ let h = r.header();
+ (h.operation, h.namespace, r.as_generic().clone())
+ }
+ MessageBag::Prepare(ref p) => {
+ let h = p.header();
+ (h.operation, h.namespace, p.as_generic().clone())
+ }
+ MessageBag::PrepareOk(ref p) => {
+ let h = p.header();
+ (h.operation, h.namespace, p.as_generic().clone())
+ }
+ };
+ let namespace = IggyNamespace::from_raw(namespace);
+
+ // Determine which shard should handle a message given its operation
and
+ // namespace.
+ //
+ // - Metadata operations always route to shard 0 (the control plane).
+ // - Partition operations route to the shard that owns the namespace,
+ // looked up via the [`ShardsTable`].
+ // - Unknown operations fall back to shard 0.
+ let target = if operation.is_metadata() {
+ 0
+ } else if operation.is_partition() {
+ self.shards_table.shard_for(namespace).unwrap_or_else(|| {
+ tracing::warn!(
+ shard = self.id,
+ stream = namespace.stream_id(),
+ topic = namespace.topic_id(),
+ partition = namespace.partition_id(),
+ "namespace not found in shards_table, falling back to
shard 0"
+ );
+ 0
+ })
+ } else {
+ 0
+ };
+ // Create a frame and send it to the target shard.
+ let (frame, rx) = ShardFrame::<R>::with_response(generic);
+ let _ = self.senders[target as usize].send(frame);
+ rx
+ }
+
+ /// Drain this shard's inbox and process each frame locally.
+ pub async fn run_message_pump(&self, stop: Receiver<()>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+ {
+ loop {
+ futures::select! {
+ _ = stop.recv().fuse() => break,
+ frame = self.inbox.recv().fuse() => {
+ match frame {
+ Ok(frame) => self.process_frame(frame).await,
+ Err(_) => break,
+ }
+ }
+ }
+ }
+
+ // Drain remaining frames so in-flight requests get a response.
+ while let Ok(frame) = self.inbox.try_recv() {
+ self.process_frame(frame).await;
+ }
+ }
+
+ async fn process_frame(&self, frame: ShardFrame<R>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+ {
+ self.on_message(frame.message).await;
+ // TODO: once on_message returns an R (e.g. ShardResponse), send it
+ // back via frame.response_sender. For now the sender is dropped and
+ // the caller's receiver will observe a disconnect.
+ drop(frame.response_sender);
+ }
+}
diff --git a/core/shard/src/shards_table.rs b/core/shard/src/shards_table.rs
new file mode 100644
index 000000000..719f52e10
--- /dev/null
+++ b/core/shard/src/shards_table.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use hash32::{Hasher, Murmur3Hasher};
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
+use std::hash::Hasher as _;
+
+/// Lookup table that maps partition namespaces to their owning shard.
+///
+/// Implementations can be:
+/// - A shared concurrent map (DashMap, papaya, etc.) referenced by all shards.
+/// - A per-shard local `HashMap` replica, updated via
+/// broadcast when partitions are created, deleted, or moved.
+pub trait ShardsTable {
+ /// Returns the shard id that owns `namespace`, or `None` if the
+ /// namespace is not yet registered (partition not created or update
+ /// hasn't propagated).
+ fn shard_for(&self, namespace: IggyNamespace) -> Option<u16>;
+}
+
+/// No-op shards table for single-shard setups.
+impl ShardsTable for () {
+ fn shard_for(&self, _namespace: IggyNamespace) -> Option<u16> {
+ None
+ }
+}
+
+/// Lock-free shards table backed by [`papaya::HashMap`].
+pub struct PapayaShardsTable {
+ inner: papaya::HashMap<IggyNamespace, PartitionLocation>,
+}
+
+impl Default for PapayaShardsTable {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl PapayaShardsTable {
+ pub fn new() -> Self {
+ Self {
+ inner: papaya::HashMap::new(),
+ }
+ }
+
+ pub fn with_capacity(capacity: usize) -> Self {
+ Self {
+ inner: papaya::HashMap::with_capacity(capacity),
+ }
+ }
+
+ pub fn insert(&self, namespace: IggyNamespace, location:
PartitionLocation) {
+ self.inner.pin().insert(namespace, location);
+ }
+
+ pub fn remove(&self, namespace: &IggyNamespace) ->
Option<PartitionLocation> {
+ let guard = self.inner.guard();
+ self.inner.remove(namespace, &guard).copied()
+ }
+}
+
+impl ShardsTable for PapayaShardsTable {
+ fn shard_for(&self, namespace: IggyNamespace) -> Option<u16> {
+ let guard = self.inner.guard();
+ self.inner.get(&namespace, &guard).map(|loc| *loc.shard_id)
+ }
+}
+
+/// Deterministic partition-to-shard assignment using Murmur3 hash.
+///
+/// Given a packed `IggyNamespace` and the total number of shards, returns the
+/// shard id that should own the partition. The upper bits of the Murmur3 hash
+/// are used to avoid the weak lower bits for small integer inputs.
+pub fn calculate_shard_assignment(ns: &IggyNamespace, shard_count: u32) -> u16
{
+ let mut hasher = Murmur3Hasher::default();
+ hasher.write_u64(ns.inner());
+ let hash = hasher.finish32();
+ ((hash >> 16) % shard_count) as u16
+}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 7b3d9ee7f..ddd423f11 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -82,5 +82,9 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>,
replica_count: u8) ->
partition_consensus.init();
partitions.set_consensus(partition_consensus);
- shard::IggyShard::new(id, name, metadata, partitions)
+ // TODO: previously we used used unbounded channel with flume,
+ // but this is not possible with crossfire without mangling types due to
Flavor trait in crossfire.
+ // This needs to be revisited in the future.
+ let (_tx, inbox) = shard::channel(1024);
+ shard::IggyShard::new(id as u16, name, metadata, partitions, Vec::new(),
inbox, ())
}