This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f8b5c0570 feat(partitions): add partitions abstraction for clustering
(#2514)
f8b5c0570 is described below
commit f8b5c0570985de8389a273296daacbb0fd30f532
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 2 14:47:48 2026 +0100
feat(partitions): add partitions abstraction for clustering (#2514)
---
.github/workflows/_common.yml | 1 +
Cargo.lock | 7 ++
Cargo.toml | 1 +
DEPENDENCIES.md | 1 +
core/common/src/lib.rs | 1 +
core/common/src/sharding/local_idx.rs | 49 +++++++++
core/common/src/sharding/mod.rs | 29 +++++
.../src/shard => common/src/sharding}/namespace.rs | 78 ++++----------
core/common/src/sharding/partition_location.rs | 34 ++++++
core/common/src/sharding/shard_id.rs | 46 ++++++++
core/partitions/Cargo.toml | 31 ++++++
core/partitions/src/iggy_partition.rs | 27 +++++
core/partitions/src/iggy_partitions.rs | 119 +++++++++++++++++++++
core/partitions/src/lib.rs | 33 ++++++
core/partitions/src/types.rs | 68 ++++++++++++
.../handlers/messages/send_messages_handler.rs | 2 +-
.../handlers/segments/delete_segments_handler.rs | 2 +-
core/server/src/main.rs | 9 +-
core/server/src/shard/builder.rs | 8 +-
core/server/src/shard/communication.rs | 27 +++--
core/server/src/shard/mod.rs | 16 ++-
core/server/src/shard/namespace.rs | 68 ------------
core/server/src/shard/system/messages.rs | 3 +-
core/server/src/shard/system/partitions.rs | 17 +--
core/server/src/shard/transmission/id.rs | 44 --------
core/server/src/shard/transmission/mod.rs | 1 -
26 files changed, 512 insertions(+), 210 deletions(-)
diff --git a/.github/workflows/_common.yml b/.github/workflows/_common.yml
index 51b8277a6..9f8eccf2e 100644
--- a/.github/workflows/_common.yml
+++ b/.github/workflows/_common.yml
@@ -86,6 +86,7 @@ jobs:
java
js
mcp
+ partitions
proc
pr_template
python
diff --git a/Cargo.lock b/Cargo.lock
index 45a29d47d..b37ab1d22 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6496,6 +6496,13 @@ dependencies = [
"syn 2.0.111",
]
+[[package]]
+name = "partitions"
+version = "0.1.0"
+dependencies = [
+ "iggy_common",
+]
+
[[package]]
name = "passterm"
version = "2.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index c7c82a4b6..3b68471ce 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -44,6 +44,7 @@ members = [
"core/journal",
"core/message_bus",
"core/metadata",
+ "core/partitions",
"core/sdk",
"core/server",
"core/tools",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 65988b8ae..88133f67a 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -565,6 +565,7 @@ parking_lot_core: 0.9.12, "Apache-2.0 OR MIT",
parquet: 55.2.0, "Apache-2.0",
parse-display: 0.9.1, "Apache-2.0 OR MIT",
parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
+partitions: 0.1.0, "Apache-2.0",
passterm: 2.0.1, "BSD-3-Clause",
password-hash: 0.5.0, "Apache-2.0 OR MIT",
paste: 1.0.15, "Apache-2.0 OR MIT",
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 1adff1f9f..af04602ce 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -21,6 +21,7 @@ mod commands;
mod configs;
mod error;
mod sender;
+pub mod sharding;
mod traits;
mod types;
mod utils;
diff --git a/core/common/src/sharding/local_idx.rs
b/core/common/src/sharding/local_idx.rs
new file mode 100644
index 000000000..43f22dfab
--- /dev/null
+++ b/core/common/src/sharding/local_idx.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Deref;
+
+/// Index of a partition within a shard's local partition collection.
+///
+/// This is NOT the same as partition_id. A shard may have partitions with
+/// IDs [0, 2, 4] but their local indices would be [0, 1, 2].
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct LocalIdx(usize);
+
+impl LocalIdx {
+ pub fn new(idx: usize) -> Self {
+ Self(idx)
+ }
+
+ pub fn idx(&self) -> usize {
+ self.0
+ }
+}
+
+impl Deref for LocalIdx {
+ type Target = usize;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl From<usize> for LocalIdx {
+ fn from(idx: usize) -> Self {
+ Self(idx)
+ }
+}
diff --git a/core/common/src/sharding/mod.rs b/core/common/src/sharding/mod.rs
new file mode 100644
index 000000000..460c04ec3
--- /dev/null
+++ b/core/common/src/sharding/mod.rs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod local_idx;
+mod namespace;
+mod partition_location;
+mod shard_id;
+
+pub use local_idx::LocalIdx;
+pub use namespace::{
+ IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS,
PARTITION_MASK,
+ PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS,
TOPIC_MASK, TOPIC_SHIFT,
+};
+pub use partition_location::PartitionLocation;
+pub use shard_id::ShardId;
diff --git a/core/server/src/shard/namespace.rs
b/core/common/src/sharding/namespace.rs
similarity index 53%
copy from core/server/src/shard/namespace.rs
copy to core/common/src/sharding/namespace.rs
index 44162e013..0d00876ec 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/common/src/sharding/namespace.rs
@@ -1,31 +1,26 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::slab::partitions;
-use iggy_common::Identifier;
-
-// Packed namespace layout (works only on 64bit platforms, but we won't
support 32bit anyway)
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Packed namespace layout
// +----------------+----------------+----------------+----------------+
// | stream_id | topic_id | partition_id | unused |
// | STREAM_BITS | TOPIC_BITS | PARTITION_BITS | (64 - total) |
// +----------------+----------------+----------------+----------------+
-// TODO Use consts from the `slab` module.
pub const MAX_STREAMS: usize = 4096;
pub const MAX_TOPICS: usize = 4096;
pub const MAX_PARTITIONS: usize = 1_000_000;
@@ -54,39 +49,10 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) -
1;
pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
-#[derive(Debug)]
-pub struct IggyFullNamespace {
- stream: Identifier,
- topic: Identifier,
- partition: partitions::ContainerId,
-}
-
-impl IggyFullNamespace {
- pub fn new(stream: Identifier, topic: Identifier, partition:
partitions::ContainerId) -> Self {
- Self {
- stream,
- topic,
- partition,
- }
- }
-
- pub fn stream_id(&self) -> &Identifier {
- &self.stream
- }
-
- pub fn topic_id(&self) -> &Identifier {
- &self.topic
- }
-
- pub fn partition_id(&self) -> partitions::ContainerId {
- self.partition
- }
-
- pub fn decompose(self) -> (Identifier, Identifier,
partitions::ContainerId) {
- (self.stream, self.topic, self.partition)
- }
-}
-
+/// Packed namespace identifier for shard assignment.
+///
+/// Encodes stream_id (12 bits), topic_id (12 bits), and partition_id (20 bits)
+/// into a single u64 for efficient hashing and routing.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct IggyNamespace(u64);
diff --git a/core/common/src/sharding/partition_location.rs
b/core/common/src/sharding/partition_location.rs
new file mode 100644
index 000000000..2bd201f90
--- /dev/null
+++ b/core/common/src/sharding/partition_location.rs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{LocalIdx, ShardId};
+
+/// Location of a partition: which shard owns it and its local index within
that shard.
+#[derive(Debug, Clone, Copy)]
+pub struct PartitionLocation {
+ pub shard_id: ShardId,
+ pub local_idx: LocalIdx,
+}
+
+impl PartitionLocation {
+ pub fn new(shard_id: ShardId, local_idx: LocalIdx) -> Self {
+ Self {
+ shard_id,
+ local_idx,
+ }
+ }
+}
diff --git a/core/common/src/sharding/shard_id.rs
b/core/common/src/sharding/shard_id.rs
new file mode 100644
index 000000000..ec546c370
--- /dev/null
+++ b/core/common/src/sharding/shard_id.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Deref;
+
+/// Identifier for a shard in the cluster.
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
+pub struct ShardId(u16);
+
+impl ShardId {
+ pub fn new(id: u16) -> Self {
+ Self(id)
+ }
+
+ pub fn id(&self) -> u16 {
+ self.0
+ }
+}
+
+impl Deref for ShardId {
+ type Target = u16;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl From<u16> for ShardId {
+ fn from(id: u16) -> Self {
+ Self(id)
+ }
+}
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
new file mode 100644
index 000000000..2a343ba43
--- /dev/null
+++ b/core/partitions/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "partitions"
+version = "0.1.0"
+description = "Iggy partitions abstraction for clustering support"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming", "partitions"]
+homepage = "https://iggy.apache.org"
+documentation = "https://iggy.apache.org/docs"
+repository = "https://github.com/apache/iggy"
+readme = "../../README.md"
+
+[dependencies]
+iggy_common = { path = "../common" }
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
new file mode 100644
index 000000000..28f564cf8
--- /dev/null
+++ b/core/partitions/src/iggy_partition.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[derive(Debug, Default)]
+pub struct IggyPartition {
+ // TODO(hubcio): integrate this
+}
+
+impl IggyPartition {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
new file mode 100644
index 000000000..77354783c
--- /dev/null
+++ b/core/partitions/src/iggy_partitions.rs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![allow(dead_code)]
+
+use crate::IggyPartition;
+use iggy_common::sharding::{IggyNamespace, LocalIdx, ShardId};
+use std::collections::HashMap;
+
+/// Per-shard collection of all partitions.
+///
+/// This struct manages ALL partitions assigned to a single shard, regardless
+/// of which stream/topic they belong to.
+///
+/// Note: The partition_id within IggyNamespace may NOT equal the Vec index.
+/// For example, shard 0 might have partition_ids [0, 2, 4] while shard 1
+/// has partition_ids [1, 3, 5]. The `LocalIdx` provides the actual index
+/// into the `partitions` Vec.
+pub struct IggyPartitions {
+ shard_id: ShardId,
+ partitions: Vec<IggyPartition>,
+ namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
+}
+
+impl IggyPartitions {
+ pub fn new(shard_id: ShardId) -> Self {
+ Self {
+ shard_id,
+ partitions: Vec::new(),
+ namespace_to_local: HashMap::new(),
+ }
+ }
+
+ pub fn with_capacity(shard_id: ShardId, capacity: usize) -> Self {
+ Self {
+ shard_id,
+ partitions: Vec::with_capacity(capacity),
+ namespace_to_local: HashMap::with_capacity(capacity),
+ }
+ }
+
+ pub fn shard_id(&self) -> ShardId {
+ self.shard_id
+ }
+
+ pub fn len(&self) -> usize {
+ self.partitions.len()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.partitions.is_empty()
+ }
+
+ /// Get partition by local index.
+ pub fn get(&self, local_idx: LocalIdx) -> Option<&IggyPartition> {
+ self.partitions.get(*local_idx)
+ }
+
+ /// Get mutable partition by local index.
+ pub fn get_mut(&mut self, local_idx: LocalIdx) -> Option<&mut
IggyPartition> {
+ self.partitions.get_mut(*local_idx)
+ }
+
+ /// Lookup local index by namespace.
+ pub fn local_idx(&self, namespace: &IggyNamespace) -> Option<LocalIdx> {
+ self.namespace_to_local.get(namespace).copied()
+ }
+
+ /// Insert a new partition and return its local index.
+ pub fn insert(&mut self, namespace: IggyNamespace, partition:
IggyPartition) -> LocalIdx {
+ let local_idx = LocalIdx::new(self.partitions.len());
+ self.partitions.push(partition);
+ self.namespace_to_local.insert(namespace, local_idx);
+ local_idx
+ }
+
+ /// Remove a partition by namespace. Returns the removed partition if
found.
+ pub fn remove(&mut self, namespace: &IggyNamespace) ->
Option<IggyPartition> {
+ // TODO(hubcio): consider adding reverse map `LocalIdx →
IggyNamespace` for O(1)
+ // updates, or use a different data structure (e.g., slotmap) if
removal is frequent.
+
+ let local_idx = self.namespace_to_local.remove(namespace)?;
+ let idx = *local_idx;
+
+ if idx >= self.partitions.len() {
+ return None;
+ }
+
+ // Swap-remove for O(1) deletion
+ let partition = self.partitions.swap_remove(idx);
+
+ // If we swapped an element, update its index in the map
+ if idx < self.partitions.len() {
+ // Find the namespace that was at the last position (now at idx)
+ for (_ns, lidx) in self.namespace_to_local.iter_mut() {
+ if **lidx == self.partitions.len() {
+ *lidx = LocalIdx::new(idx);
+ break;
+ }
+ }
+ }
+
+ Some(partition)
+ }
+}
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
new file mode 100644
index 000000000..b679e05a9
--- /dev/null
+++ b/core/partitions/src/lib.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod iggy_partition;
+mod iggy_partitions;
+mod types;
+
+pub use iggy_partition::IggyPartition;
+pub use iggy_partitions::IggyPartitions;
+pub use types::{PollMetadata, PollingArgs, PollingConsumer,
SendMessagesResult};
+
+/// The core abstraction for partition operations in clustering.
+///
+/// This trait defines the data-plane operations for partitions that
+/// need to be coordinated across a cluster using viewstamped replication.
+/// Implementations can vary between single-node and clustered deployments.
+pub trait Partitions {
+ // TODO(hubcio): define partition operations like poll, send, create,
delete, etc.
+}
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
new file mode 100644
index 000000000..7ff1a9498
--- /dev/null
+++ b/core/partitions/src/types.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::PollingStrategy;
+
+/// Arguments for polling messages from a partition.
+#[derive(Debug, Clone)]
+pub struct PollingArgs {
+ pub strategy: PollingStrategy,
+ pub count: u32,
+ pub auto_commit: bool,
+}
+
+impl PollingArgs {
+ pub fn new(strategy: PollingStrategy, count: u32, auto_commit: bool) ->
Self {
+ Self {
+ strategy,
+ count,
+ auto_commit,
+ }
+ }
+}
+
+/// Metadata returned from a poll operation.
+#[derive(Debug, Clone)]
+pub struct PollMetadata {
+ pub partition_id: u32,
+ pub current_offset: u64,
+}
+
+impl PollMetadata {
+ pub fn new(partition_id: u32, current_offset: u64) -> Self {
+ Self {
+ partition_id,
+ current_offset,
+ }
+ }
+}
+
+/// Result of sending messages.
+#[derive(Debug)]
+pub struct SendMessagesResult {
+ pub messages_count: u32,
+}
+
+/// Consumer identification for offset operations.
+// TODO(hubcio): unify with server's `PollingConsumer` in
`streaming/polling_consumer.rs`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PollingConsumer {
+ /// Regular consumer with (consumer_id, partition_id)
+ Consumer(usize, usize),
+ /// Consumer group with (group_id, member_id)
+ ConsumerGroup(usize, usize),
+}
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 1d2e5d9d0..375848799 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -18,7 +18,6 @@
use crate::binary::command::{BinaryServerCommand, HandlerResult,
ServerCommandHandler};
use crate::shard::IggyShard;
-use crate::shard::namespace::IggyNamespace;
use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
use crate::streaming::session::Session;
@@ -29,6 +28,7 @@ use iggy_common::Identifier;
use iggy_common::PooledBuffer;
use iggy_common::SenderKind;
use iggy_common::Sizeable;
+use iggy_common::sharding::IggyNamespace;
use iggy_common::{INDEX_SIZE, PartitioningKind};
use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
use std::rc::Rc;
diff --git
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index ed7271901..a4eb20654 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -22,7 +22,6 @@ use crate::binary::command::{
use crate::binary::handlers::partitions::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
-use crate::shard::namespace::IggyNamespace;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -33,6 +32,7 @@ use crate::streaming::session::Session;
use anyhow::Result;
use err_trail::ErrContext;
use iggy_common::delete_segments::DeleteSegments;
+use iggy_common::sharding::IggyNamespace;
use iggy_common::{IggyError, SenderKind};
use std::rc::Rc;
use tracing::{debug, instrument};
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index ebbf28f7a..3fe36e20d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -23,6 +23,7 @@ use dashmap::DashMap;
use dotenvy::dotenv;
use err_trail::ErrContext;
use figlet_rs::FIGfont;
+use iggy_common::sharding::{IggyNamespace, LocalIdx, PartitionLocation,
ShardId};
use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, MemoryPool};
use server::SEMANTIC_VERSION;
use server::args::Args;
@@ -35,9 +36,7 @@ use server::diagnostics::{print_io_uring_permission_info,
print_locked_memory_li
use server::io::fs_utils;
use server::log::logger::Logging;
use server::server_error::ServerError;
-use server::shard::namespace::IggyNamespace;
use server::shard::system::info::SystemInfo;
-use server::shard::transmission::id::ShardId;
use server::shard::{IggyShard, calculate_shard_assignment};
use server::slab::traits_ext::{
EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents,
@@ -314,7 +313,7 @@ fn main() -> Result<(), ServerError> {
// Shared resources bootstrap.
let shards_table =
Box::new(DashMap::with_capacity(SHARDS_TABLE_CAPACITY));
let shards_table = Box::leak(shards_table);
- let shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>> =
shards_table.into();
+ let shards_table: EternalPtr<DashMap<IggyNamespace,
PartitionLocation>> = shards_table.into();
let client_manager = Box::new(DashMap::new());
let client_manager = Box::leak(client_manager);
@@ -338,7 +337,9 @@ fn main() -> Result<(), ServerError> {
&ns,
shard_assignment.len() as u32,
));
- shards_table.insert(ns, shard_id);
+ // TODO(hubcio): LocalIdx is 0 until
IggyPartitions is integratedds
+ let location =
PartitionLocation::new(shard_id, LocalIdx::new(0));
+ shards_table.insert(ns, location);
}
});
}
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 188452fe8..135a9b4bc 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -18,11 +18,10 @@
use super::{
IggyShard, TaskRegistry, transmission::connector::ShardConnector,
- transmission::frame::ShardFrame, transmission::id::ShardId,
+ transmission::frame::ShardFrame,
};
use crate::{
configs::server::ServerConfig,
- shard::namespace::IggyNamespace,
slab::{streams::Streams, users::Users},
state::file::FileState,
streaming::{
@@ -33,13 +32,14 @@ use crate::{
};
use dashmap::DashMap;
use iggy_common::EncryptorKind;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
use std::{cell::Cell, rc::Rc, sync::atomic::AtomicBool};
#[derive(Default)]
pub struct IggyShardBuilder {
id: Option<u16>,
streams: Option<Streams>,
- shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>,
+ shards_table: Option<EternalPtr<DashMap<IggyNamespace,
PartitionLocation>>>,
state: Option<FileState>,
users: Option<Users>,
client_manager: Option<ClientManager>,
@@ -69,7 +69,7 @@ impl IggyShardBuilder {
pub fn shards_table(
mut self,
- shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>,
+ shards_table: EternalPtr<DashMap<IggyNamespace, PartitionLocation>>,
) -> Self {
self.shards_table = Some(shards_table);
self
diff --git a/core/server/src/shard/communication.rs
b/core/server/src/shard/communication.rs
index 59dc11183..497e61013 100644
--- a/core/server/src/shard/communication.rs
+++ b/core/server/src/shard/communication.rs
@@ -17,18 +17,17 @@
use crate::shard::{
BROADCAST_TIMEOUT, COMPONENT, IggyShard,
- namespace::IggyNamespace,
transmission::{
connector::ShardConnector,
event::ShardEvent,
frame::ShardFrame,
- id::ShardId,
message::{ShardMessage, ShardSendRequestResult},
},
};
use futures::future::join_all;
use hash32::{Hasher, Murmur3Hasher};
use iggy_common::IggyError;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
use std::hash::Hasher as _;
use tracing::{error, info, warn};
@@ -138,48 +137,48 @@ impl IggyShard {
}
pub fn find_shard(&self, namespace: &IggyNamespace) ->
Option<&ShardConnector<ShardFrame>> {
- self.shards_table.get(namespace).map(|shard_id| {
+ self.shards_table.get(namespace).map(|location| {
self.shards
.iter()
- .find(|shard| shard.id == shard_id.id())
+ .find(|shard| shard.id == *location.shard_id)
.expect("Shard not found in the shards table.")
})
}
- pub fn find_shard_table_record(&self, namespace: &IggyNamespace) ->
Option<ShardId> {
+ pub fn find_shard_table_record(&self, namespace: &IggyNamespace) ->
Option<PartitionLocation> {
self.shards_table.get(namespace).map(|entry| *entry)
}
- pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) ->
ShardId {
+ pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) ->
PartitionLocation {
self.shards_table
.remove(namespace)
- .map(|(_, shard_id)| shard_id)
+ .map(|(_, location)| location)
.expect("remove_shard_table_record: namespace not found")
}
pub fn remove_shard_table_records(
&self,
namespaces: &[IggyNamespace],
- ) -> Vec<(IggyNamespace, ShardId)> {
+ ) -> Vec<(IggyNamespace, PartitionLocation)> {
namespaces
.iter()
.map(|ns| {
- let (ns, shard_id) = self.shards_table.remove(ns).unwrap();
- (ns, shard_id)
+ let (ns, location) = self.shards_table.remove(ns).unwrap();
+ (ns, location)
})
.collect()
}
- pub fn insert_shard_table_record(&self, ns: IggyNamespace, shard_id:
ShardId) {
- self.shards_table.insert(ns, shard_id);
+ pub fn insert_shard_table_record(&self, ns: IggyNamespace, location:
PartitionLocation) {
+ self.shards_table.insert(ns, location);
}
pub fn get_current_shard_namespaces(&self) -> Vec<IggyNamespace> {
self.shards_table
.iter()
.filter_map(|entry| {
- let (ns, shard_id) = entry.pair();
- if shard_id.id() == self.id {
+ let (ns, location) = entry.pair();
+ if *location.shard_id == self.id {
Some(*ns)
} else {
None
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index f48279362..8939e87b0 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -33,9 +33,7 @@ use self::tasks::{continuous, periodic};
use crate::{
configs::server::ServerConfig,
io::fs_locks::FsLocks,
- shard::{
- namespace::IggyNamespace, task_registry::TaskRegistry,
transmission::frame::ShardFrame,
- },
+ shard::{task_registry::TaskRegistry, transmission::frame::ShardFrame},
slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
state::file::FileState,
streaming::{
@@ -46,6 +44,7 @@ use crate::{
};
use builder::IggyShardBuilder;
use dashmap::DashMap;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
use iggy_common::{EncryptorKind, Identifier, IggyError};
use std::{
cell::{Cell, RefCell},
@@ -55,10 +54,7 @@ use std::{
time::{Duration, Instant},
};
use tracing::{debug, error, info, instrument};
-use transmission::{
- connector::{Receiver, ShardConnector, StopReceiver},
- id::ShardId,
-};
+use transmission::connector::{Receiver, ShardConnector, StopReceiver};
pub const COMPONENT: &str = "SHARD";
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
@@ -70,7 +66,7 @@ pub struct IggyShard {
_version: SemanticVersion,
pub(crate) streams: Streams,
- pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>,
+ pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace,
PartitionLocation>>,
pub(crate) state: FileState,
pub(crate) fs_locks: FsLocks,
@@ -190,9 +186,9 @@ impl IggyShard {
async fn load_segments(&self) -> Result<(), IggyError> {
use crate::bootstrap::load_segments;
for shard_entry in self.shards_table.iter() {
- let (namespace, shard_id) = shard_entry.pair();
+ let (namespace, location) = shard_entry.pair();
- if **shard_id == self.id {
+ if *location.shard_id == self.id {
let stream_id = namespace.stream_id();
let topic_id: usize = namespace.topic_id();
let partition_id = namespace.partition_id();
diff --git a/core/server/src/shard/namespace.rs
b/core/server/src/shard/namespace.rs
index 44162e013..289587179 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -19,41 +19,6 @@
use crate::slab::partitions;
use iggy_common::Identifier;
-// Packed namespace layout (works only on 64bit platforms, but we won't
support 32bit anyway)
-// +----------------+----------------+----------------+----------------+
-// | stream_id | topic_id | partition_id | unused |
-// | STREAM_BITS | TOPIC_BITS | PARTITION_BITS | (64 - total) |
-// +----------------+----------------+----------------+----------------+
-
-// TODO Use consts from the `slab` module.
-pub const MAX_STREAMS: usize = 4096;
-pub const MAX_TOPICS: usize = 4096;
-pub const MAX_PARTITIONS: usize = 1_000_000;
-
-const fn bits_required(mut n: u64) -> u32 {
- if n == 0 {
- return 1;
- }
- let mut b = 0;
- while n > 0 {
- b += 1;
- n >>= 1;
- }
- b
-}
-
-pub const STREAM_BITS: u32 = bits_required((MAX_STREAMS - 1) as u64);
-pub const TOPIC_BITS: u32 = bits_required((MAX_TOPICS - 1) as u64);
-pub const PARTITION_BITS: u32 = bits_required((MAX_PARTITIONS - 1) as u64);
-
-pub const PARTITION_SHIFT: u32 = 0;
-pub const TOPIC_SHIFT: u32 = PARTITION_SHIFT + PARTITION_BITS;
-pub const STREAM_SHIFT: u32 = TOPIC_SHIFT + TOPIC_BITS;
-
-pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1;
-pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
-pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
-
#[derive(Debug)]
pub struct IggyFullNamespace {
stream: Identifier,
@@ -86,36 +51,3 @@ impl IggyFullNamespace {
(self.stream, self.topic, self.partition)
}
}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub struct IggyNamespace(u64);
-
-impl IggyNamespace {
- #[inline]
- pub fn inner(&self) -> u64 {
- self.0
- }
-
- #[inline]
- pub fn stream_id(&self) -> usize {
- ((self.0 >> STREAM_SHIFT) & STREAM_MASK) as usize
- }
-
- #[inline]
- pub fn topic_id(&self) -> usize {
- ((self.0 >> TOPIC_SHIFT) & TOPIC_MASK) as usize
- }
-
- #[inline]
- pub fn partition_id(&self) -> usize {
- ((self.0 >> PARTITION_SHIFT) & PARTITION_MASK) as usize
- }
-
- #[inline]
- pub fn new(stream: usize, topic: usize, partition: usize) -> Self {
- let value = ((stream as u64) & STREAM_MASK) << STREAM_SHIFT
- | ((topic as u64) & TOPIC_MASK) << TOPIC_SHIFT
- | ((partition as u64) & PARTITION_MASK) << PARTITION_SHIFT;
- Self(value)
- }
-}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 0c7e4853a..02e8d9d0e 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -19,7 +19,7 @@
use super::COMPONENT;
use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
use crate::shard::IggyShard;
-use crate::shard::namespace::{IggyFullNamespace, IggyNamespace};
+use crate::shard::namespace::IggyFullNamespace;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -29,6 +29,7 @@ use crate::streaming::traits::MainOps;
use crate::streaming::{partitions, streams, topics};
use err_trail::ErrContext;
use iggy_common::PooledBuffer;
+use iggy_common::sharding::IggyNamespace;
use iggy_common::{
BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE,
Identifier, IggyError,
PollingKind, PollingStrategy,
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index e54b6678d..aacbd52c0 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -19,8 +19,6 @@
use super::COMPONENT;
use crate::shard::IggyShard;
use crate::shard::calculate_shard_assignment;
-use crate::shard::namespace::IggyNamespace;
-use crate::shard::transmission::id::ShardId;
use crate::slab::traits_ext::EntityMarker;
use crate::slab::traits_ext::IntoComponents;
use crate::streaming::partitions;
@@ -35,6 +33,7 @@ use crate::streaming::topics;
use err_trail::ErrContext;
use iggy_common::Identifier;
use iggy_common::IggyError;
+use iggy_common::sharding::{IggyNamespace, LocalIdx, PartitionLocation,
ShardId};
use tracing::info;
impl IggyShard {
@@ -105,7 +104,10 @@ impl IggyShard {
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
partition_id);
let shard_id = ShardId::new(calculate_shard_assignment(&ns,
shards_count));
let is_current_shard = self.id == *shard_id;
- self.insert_shard_table_record(ns, shard_id);
+ // TODO(hubcio): LocalIdx(0) is wrong.. When IggyPartitions is
integrated into
+ // IggyShard, this should use the actual index returned by
IggyPartitions::insert().
+ let location = PartitionLocation::new(shard_id, LocalIdx::new(0));
+ self.insert_shard_table_record(ns, location);
create_partition_file_hierarchy(
numeric_stream_id as usize,
@@ -204,11 +206,14 @@ impl IggyShard {
"create_partitions_bypass_auth: partition mismatch ID, wrong
creation order ?!"
);
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
id);
- let shard_id = self.find_shard_table_record(&ns).unwrap_or_else(||
{
+ // TODO(hubcio): when IggyPartitions is integrated, this fallback
path should
+ // either be removed or use proper index resolution.
+ let location = self.find_shard_table_record(&ns).unwrap_or_else(||
{
tracing::warn!("WARNING: missing shard table record for
namespace: {:?}, in the event handler for `CreatedPartitions` event.", ns);
- ShardId::new(calculate_shard_assignment(&ns, shards_count))
+ let shard_id = ShardId::new(calculate_shard_assignment(&ns,
shards_count));
+ PartitionLocation::new(shard_id, LocalIdx::new(0))
});
- if self.id == *shard_id {
+ if self.id == *location.shard_id {
self.init_log(stream_id, topic_id, id).await?;
}
}
diff --git a/core/server/src/shard/transmission/id.rs
b/core/server/src/shard/transmission/id.rs
deleted file mode 100644
index ad16ff9ad..000000000
--- a/core/server/src/shard/transmission/id.rs
+++ /dev/null
@@ -1,44 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::ops::Deref;
-
-// TODO: Maybe pad to cache line size?
-#[derive(Debug, Clone, Copy, Eq, PartialEq)]
-pub struct ShardId {
- id: u16,
-}
-
-impl ShardId {
- pub fn new(id: u16) -> Self {
- Self { id }
- }
-
- pub fn id(&self) -> u16 {
- self.id
- }
-}
-
-impl Deref for ShardId {
- type Target = u16;
-
- fn deref(&self) -> &Self::Target {
- &self.id
- }
-}
diff --git a/core/server/src/shard/transmission/mod.rs
b/core/server/src/shard/transmission/mod.rs
index 400c93652..107bdcbd9 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -19,5 +19,4 @@
pub mod connector;
pub mod event;
pub mod frame;
-pub mod id;
pub mod message;