This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f8b5c0570 feat(partitions): add partitions abstraction for clustering 
(#2514)
f8b5c0570 is described below

commit f8b5c0570985de8389a273296daacbb0fd30f532
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 2 14:47:48 2026 +0100

    feat(partitions): add partitions abstraction for clustering (#2514)
---
 .github/workflows/_common.yml                      |   1 +
 Cargo.lock                                         |   7 ++
 Cargo.toml                                         |   1 +
 DEPENDENCIES.md                                    |   1 +
 core/common/src/lib.rs                             |   1 +
 core/common/src/sharding/local_idx.rs              |  49 +++++++++
 core/common/src/sharding/mod.rs                    |  29 +++++
 .../src/shard => common/src/sharding}/namespace.rs |  78 ++++----------
 core/common/src/sharding/partition_location.rs     |  34 ++++++
 core/common/src/sharding/shard_id.rs               |  46 ++++++++
 core/partitions/Cargo.toml                         |  31 ++++++
 core/partitions/src/iggy_partition.rs              |  27 +++++
 core/partitions/src/iggy_partitions.rs             | 119 +++++++++++++++++++++
 core/partitions/src/lib.rs                         |  33 ++++++
 core/partitions/src/types.rs                       |  68 ++++++++++++
 .../handlers/messages/send_messages_handler.rs     |   2 +-
 .../handlers/segments/delete_segments_handler.rs   |   2 +-
 core/server/src/main.rs                            |   9 +-
 core/server/src/shard/builder.rs                   |   8 +-
 core/server/src/shard/communication.rs             |  27 +++--
 core/server/src/shard/mod.rs                       |  16 ++-
 core/server/src/shard/namespace.rs                 |  68 ------------
 core/server/src/shard/system/messages.rs           |   3 +-
 core/server/src/shard/system/partitions.rs         |  17 +--
 core/server/src/shard/transmission/id.rs           |  44 --------
 core/server/src/shard/transmission/mod.rs          |   1 -
 26 files changed, 512 insertions(+), 210 deletions(-)

diff --git a/.github/workflows/_common.yml b/.github/workflows/_common.yml
index 51b8277a6..9f8eccf2e 100644
--- a/.github/workflows/_common.yml
+++ b/.github/workflows/_common.yml
@@ -86,6 +86,7 @@ jobs:
             java
             js
             mcp
+            partitions
             proc
             pr_template
             python
diff --git a/Cargo.lock b/Cargo.lock
index 45a29d47d..b37ab1d22 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6496,6 +6496,13 @@ dependencies = [
  "syn 2.0.111",
 ]
 
+[[package]]
+name = "partitions"
+version = "0.1.0"
+dependencies = [
+ "iggy_common",
+]
+
 [[package]]
 name = "passterm"
 version = "2.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index c7c82a4b6..3b68471ce 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -44,6 +44,7 @@ members = [
     "core/journal",
     "core/message_bus",
     "core/metadata",
+    "core/partitions",
     "core/sdk",
     "core/server",
     "core/tools",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 65988b8ae..88133f67a 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -565,6 +565,7 @@ parking_lot_core: 0.9.12, "Apache-2.0 OR MIT",
 parquet: 55.2.0, "Apache-2.0",
 parse-display: 0.9.1, "Apache-2.0 OR MIT",
 parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
+partitions: 0.1.0, "Apache-2.0",
 passterm: 2.0.1, "BSD-3-Clause",
 password-hash: 0.5.0, "Apache-2.0 OR MIT",
 paste: 1.0.15, "Apache-2.0 OR MIT",
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 1adff1f9f..af04602ce 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -21,6 +21,7 @@ mod commands;
 mod configs;
 mod error;
 mod sender;
+pub mod sharding;
 mod traits;
 mod types;
 mod utils;
diff --git a/core/common/src/sharding/local_idx.rs 
b/core/common/src/sharding/local_idx.rs
new file mode 100644
index 000000000..43f22dfab
--- /dev/null
+++ b/core/common/src/sharding/local_idx.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Deref;
+
+/// Index of a partition within a shard's local partition collection.
+///
+/// This is NOT the same as partition_id. A shard may have partitions with
+/// IDs [0, 2, 4] but their local indices would be [0, 1, 2].
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct LocalIdx(usize);
+
+impl LocalIdx {
+    pub fn new(idx: usize) -> Self {
+        Self(idx)
+    }
+
+    pub fn idx(&self) -> usize {
+        self.0
+    }
+}
+
+impl Deref for LocalIdx {
+    type Target = usize;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl From<usize> for LocalIdx {
+    fn from(idx: usize) -> Self {
+        Self(idx)
+    }
+}
diff --git a/core/common/src/sharding/mod.rs b/core/common/src/sharding/mod.rs
new file mode 100644
index 000000000..460c04ec3
--- /dev/null
+++ b/core/common/src/sharding/mod.rs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod local_idx;
+mod namespace;
+mod partition_location;
+mod shard_id;
+
+pub use local_idx::LocalIdx;
+pub use namespace::{
+    IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS, 
PARTITION_MASK,
+    PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, 
TOPIC_MASK, TOPIC_SHIFT,
+};
+pub use partition_location::PartitionLocation;
+pub use shard_id::ShardId;
diff --git a/core/server/src/shard/namespace.rs 
b/core/common/src/sharding/namespace.rs
similarity index 53%
copy from core/server/src/shard/namespace.rs
copy to core/common/src/sharding/namespace.rs
index 44162e013..0d00876ec 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/common/src/sharding/namespace.rs
@@ -1,31 +1,26 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::slab::partitions;
-use iggy_common::Identifier;
-
-// Packed namespace layout (works only on 64bit platforms, but we won't 
support 32bit anyway)
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Packed namespace layout
 // +----------------+----------------+----------------+----------------+
 // |    stream_id   |    topic_id    |  partition_id  |     unused     |
 // |    STREAM_BITS |    TOPIC_BITS  | PARTITION_BITS |  (64 - total)  |
 // +----------------+----------------+----------------+----------------+
 
-// TODO Use consts from the `slab` module.
 pub const MAX_STREAMS: usize = 4096;
 pub const MAX_TOPICS: usize = 4096;
 pub const MAX_PARTITIONS: usize = 1_000_000;
@@ -54,39 +49,10 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 
1;
 pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
 pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
 
-#[derive(Debug)]
-pub struct IggyFullNamespace {
-    stream: Identifier,
-    topic: Identifier,
-    partition: partitions::ContainerId,
-}
-
-impl IggyFullNamespace {
-    pub fn new(stream: Identifier, topic: Identifier, partition: 
partitions::ContainerId) -> Self {
-        Self {
-            stream,
-            topic,
-            partition,
-        }
-    }
-
-    pub fn stream_id(&self) -> &Identifier {
-        &self.stream
-    }
-
-    pub fn topic_id(&self) -> &Identifier {
-        &self.topic
-    }
-
-    pub fn partition_id(&self) -> partitions::ContainerId {
-        self.partition
-    }
-
-    pub fn decompose(self) -> (Identifier, Identifier, 
partitions::ContainerId) {
-        (self.stream, self.topic, self.partition)
-    }
-}
-
+/// Packed namespace identifier for shard assignment.
+///
+/// Encodes stream_id (12 bits), topic_id (12 bits), and partition_id (20 bits)
+/// into a single u64 for efficient hashing and routing.
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub struct IggyNamespace(u64);
 
diff --git a/core/common/src/sharding/partition_location.rs 
b/core/common/src/sharding/partition_location.rs
new file mode 100644
index 000000000..2bd201f90
--- /dev/null
+++ b/core/common/src/sharding/partition_location.rs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{LocalIdx, ShardId};
+
+/// Location of a partition: which shard owns it and its local index within 
that shard.
+#[derive(Debug, Clone, Copy)]
+pub struct PartitionLocation {
+    pub shard_id: ShardId,
+    pub local_idx: LocalIdx,
+}
+
+impl PartitionLocation {
+    pub fn new(shard_id: ShardId, local_idx: LocalIdx) -> Self {
+        Self {
+            shard_id,
+            local_idx,
+        }
+    }
+}
diff --git a/core/common/src/sharding/shard_id.rs 
b/core/common/src/sharding/shard_id.rs
new file mode 100644
index 000000000..ec546c370
--- /dev/null
+++ b/core/common/src/sharding/shard_id.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Deref;
+
+/// Identifier for a shard in the cluster.
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
+pub struct ShardId(u16);
+
+impl ShardId {
+    pub fn new(id: u16) -> Self {
+        Self(id)
+    }
+
+    pub fn id(&self) -> u16 {
+        self.0
+    }
+}
+
+impl Deref for ShardId {
+    type Target = u16;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl From<u16> for ShardId {
+    fn from(id: u16) -> Self {
+        Self(id)
+    }
+}
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
new file mode 100644
index 000000000..2a343ba43
--- /dev/null
+++ b/core/partitions/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "partitions"
+version = "0.1.0"
+description = "Iggy partitions abstraction for clustering support"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming", "partitions"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+iggy_common = { path = "../common" }
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
new file mode 100644
index 000000000..28f564cf8
--- /dev/null
+++ b/core/partitions/src/iggy_partition.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[derive(Debug, Default)]
+pub struct IggyPartition {
+    // TODO(hubcio): integrate this
+}
+
+impl IggyPartition {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
new file mode 100644
index 000000000..77354783c
--- /dev/null
+++ b/core/partitions/src/iggy_partitions.rs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![allow(dead_code)]
+
+use crate::IggyPartition;
+use iggy_common::sharding::{IggyNamespace, LocalIdx, ShardId};
+use std::collections::HashMap;
+
+/// Per-shard collection of all partitions.
+///
+/// This struct manages ALL partitions assigned to a single shard, regardless
+/// of which stream/topic they belong to.
+///
+/// Note: The partition_id within IggyNamespace may NOT equal the Vec index.
+/// For example, shard 0 might have partition_ids [0, 2, 4] while shard 1
+/// has partition_ids [1, 3, 5]. The `LocalIdx` provides the actual index
+/// into the `partitions` Vec.
+pub struct IggyPartitions {
+    shard_id: ShardId,
+    partitions: Vec<IggyPartition>,
+    namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
+}
+
+impl IggyPartitions {
+    pub fn new(shard_id: ShardId) -> Self {
+        Self {
+            shard_id,
+            partitions: Vec::new(),
+            namespace_to_local: HashMap::new(),
+        }
+    }
+
+    pub fn with_capacity(shard_id: ShardId, capacity: usize) -> Self {
+        Self {
+            shard_id,
+            partitions: Vec::with_capacity(capacity),
+            namespace_to_local: HashMap::with_capacity(capacity),
+        }
+    }
+
+    pub fn shard_id(&self) -> ShardId {
+        self.shard_id
+    }
+
+    pub fn len(&self) -> usize {
+        self.partitions.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.partitions.is_empty()
+    }
+
+    /// Get partition by local index.
+    pub fn get(&self, local_idx: LocalIdx) -> Option<&IggyPartition> {
+        self.partitions.get(*local_idx)
+    }
+
+    /// Get mutable partition by local index.
+    pub fn get_mut(&mut self, local_idx: LocalIdx) -> Option<&mut 
IggyPartition> {
+        self.partitions.get_mut(*local_idx)
+    }
+
+    /// Lookup local index by namespace.
+    pub fn local_idx(&self, namespace: &IggyNamespace) -> Option<LocalIdx> {
+        self.namespace_to_local.get(namespace).copied()
+    }
+
+    /// Insert a new partition and return its local index.
+    pub fn insert(&mut self, namespace: IggyNamespace, partition: 
IggyPartition) -> LocalIdx {
+        let local_idx = LocalIdx::new(self.partitions.len());
+        self.partitions.push(partition);
+        self.namespace_to_local.insert(namespace, local_idx);
+        local_idx
+    }
+
+    /// Remove a partition by namespace. Returns the removed partition if 
found.
+    pub fn remove(&mut self, namespace: &IggyNamespace) -> 
Option<IggyPartition> {
+        // TODO(hubcio): consider adding reverse map `LocalIdx → 
IggyNamespace` for O(1)
+        // updates, or use a different data structure (e.g., slotmap) if 
removal is frequent.
+
+        let local_idx = self.namespace_to_local.remove(namespace)?;
+        let idx = *local_idx;
+
+        if idx >= self.partitions.len() {
+            return None;
+        }
+
+        // Swap-remove for O(1) deletion
+        let partition = self.partitions.swap_remove(idx);
+
+        // If we swapped an element, update its index in the map
+        if idx < self.partitions.len() {
+            // Find the namespace that was at the last position (now at idx)
+            for (_ns, lidx) in self.namespace_to_local.iter_mut() {
+                if **lidx == self.partitions.len() {
+                    *lidx = LocalIdx::new(idx);
+                    break;
+                }
+            }
+        }
+
+        Some(partition)
+    }
+}
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
new file mode 100644
index 000000000..b679e05a9
--- /dev/null
+++ b/core/partitions/src/lib.rs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod iggy_partition;
+mod iggy_partitions;
+mod types;
+
+pub use iggy_partition::IggyPartition;
+pub use iggy_partitions::IggyPartitions;
+pub use types::{PollMetadata, PollingArgs, PollingConsumer, 
SendMessagesResult};
+
+/// The core abstraction for partition operations in clustering.
+///
+/// This trait defines the data-plane operations for partitions that
+/// need to be coordinated across a cluster using viewstamped replication.
+/// Implementations can vary between single-node and clustered deployments.
+pub trait Partitions {
+    // TODO(hubcio): define partition operations like poll, send, create, 
delete, etc.
+}
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
new file mode 100644
index 000000000..7ff1a9498
--- /dev/null
+++ b/core/partitions/src/types.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::PollingStrategy;
+
+/// Arguments for polling messages from a partition.
+#[derive(Debug, Clone)]
+pub struct PollingArgs {
+    pub strategy: PollingStrategy,
+    pub count: u32,
+    pub auto_commit: bool,
+}
+
+impl PollingArgs {
+    pub fn new(strategy: PollingStrategy, count: u32, auto_commit: bool) -> 
Self {
+        Self {
+            strategy,
+            count,
+            auto_commit,
+        }
+    }
+}
+
+/// Metadata returned from a poll operation.
+#[derive(Debug, Clone)]
+pub struct PollMetadata {
+    pub partition_id: u32,
+    pub current_offset: u64,
+}
+
+impl PollMetadata {
+    pub fn new(partition_id: u32, current_offset: u64) -> Self {
+        Self {
+            partition_id,
+            current_offset,
+        }
+    }
+}
+
+/// Result of sending messages.
+#[derive(Debug)]
+pub struct SendMessagesResult {
+    pub messages_count: u32,
+}
+
+/// Consumer identification for offset operations.
+// TODO(hubcio): unify with server's `PollingConsumer` in 
`streaming/polling_consumer.rs`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PollingConsumer {
+    /// Regular consumer with (consumer_id, partition_id)
+    Consumer(usize, usize),
+    /// Consumer group with (group_id, member_id)
+    ConsumerGroup(usize, usize),
+}
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 1d2e5d9d0..375848799 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -18,7 +18,6 @@
 
 use crate::binary::command::{BinaryServerCommand, HandlerResult, 
ServerCommandHandler};
 use crate::shard::IggyShard;
-use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
@@ -29,6 +28,7 @@ use iggy_common::Identifier;
 use iggy_common::PooledBuffer;
 use iggy_common::SenderKind;
 use iggy_common::Sizeable;
+use iggy_common::sharding::IggyNamespace;
 use iggy_common::{INDEX_SIZE, PartitioningKind};
 use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
 use std::rc::Rc;
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index ed7271901..a4eb20654 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -22,7 +22,6 @@ use crate::binary::command::{
 use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
-use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -33,6 +32,7 @@ use crate::streaming::session::Session;
 use anyhow::Result;
 use err_trail::ErrContext;
 use iggy_common::delete_segments::DeleteSegments;
+use iggy_common::sharding::IggyNamespace;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index ebbf28f7a..3fe36e20d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -23,6 +23,7 @@ use dashmap::DashMap;
 use dotenvy::dotenv;
 use err_trail::ErrContext;
 use figlet_rs::FIGfont;
+use iggy_common::sharding::{IggyNamespace, LocalIdx, PartitionLocation, 
ShardId};
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, MemoryPool};
 use server::SEMANTIC_VERSION;
 use server::args::Args;
@@ -35,9 +36,7 @@ use server::diagnostics::{print_io_uring_permission_info, 
print_locked_memory_li
 use server::io::fs_utils;
 use server::log::logger::Logging;
 use server::server_error::ServerError;
-use server::shard::namespace::IggyNamespace;
 use server::shard::system::info::SystemInfo;
-use server::shard::transmission::id::ShardId;
 use server::shard::{IggyShard, calculate_shard_assignment};
 use server::slab::traits_ext::{
     EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents,
@@ -314,7 +313,7 @@ fn main() -> Result<(), ServerError> {
         // Shared resources bootstrap.
         let shards_table = 
Box::new(DashMap::with_capacity(SHARDS_TABLE_CAPACITY));
         let shards_table = Box::leak(shards_table);
-        let shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>> = 
shards_table.into();
+        let shards_table: EternalPtr<DashMap<IggyNamespace, 
PartitionLocation>> = shards_table.into();
 
         let client_manager = Box::new(DashMap::new());
         let client_manager = Box::leak(client_manager);
@@ -338,7 +337,9 @@ fn main() -> Result<(), ServerError> {
                                     &ns,
                                     shard_assignment.len() as u32,
                                 ));
-                                shards_table.insert(ns, shard_id);
+                                // TODO(hubcio): LocalIdx is 0 until 
IggyPartitions is integratedds
+                                let location = 
PartitionLocation::new(shard_id, LocalIdx::new(0));
+                                shards_table.insert(ns, location);
                             }
                         });
                     }
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 188452fe8..135a9b4bc 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -18,11 +18,10 @@
 
 use super::{
     IggyShard, TaskRegistry, transmission::connector::ShardConnector,
-    transmission::frame::ShardFrame, transmission::id::ShardId,
+    transmission::frame::ShardFrame,
 };
 use crate::{
     configs::server::ServerConfig,
-    shard::namespace::IggyNamespace,
     slab::{streams::Streams, users::Users},
     state::file::FileState,
     streaming::{
@@ -33,13 +32,14 @@ use crate::{
 };
 use dashmap::DashMap;
 use iggy_common::EncryptorKind;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
 use std::{cell::Cell, rc::Rc, sync::atomic::AtomicBool};
 
 #[derive(Default)]
 pub struct IggyShardBuilder {
     id: Option<u16>,
     streams: Option<Streams>,
-    shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>,
+    shards_table: Option<EternalPtr<DashMap<IggyNamespace, 
PartitionLocation>>>,
     state: Option<FileState>,
     users: Option<Users>,
     client_manager: Option<ClientManager>,
@@ -69,7 +69,7 @@ impl IggyShardBuilder {
 
     pub fn shards_table(
         mut self,
-        shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>,
+        shards_table: EternalPtr<DashMap<IggyNamespace, PartitionLocation>>,
     ) -> Self {
         self.shards_table = Some(shards_table);
         self
diff --git a/core/server/src/shard/communication.rs 
b/core/server/src/shard/communication.rs
index 59dc11183..497e61013 100644
--- a/core/server/src/shard/communication.rs
+++ b/core/server/src/shard/communication.rs
@@ -17,18 +17,17 @@
 
 use crate::shard::{
     BROADCAST_TIMEOUT, COMPONENT, IggyShard,
-    namespace::IggyNamespace,
     transmission::{
         connector::ShardConnector,
         event::ShardEvent,
         frame::ShardFrame,
-        id::ShardId,
         message::{ShardMessage, ShardSendRequestResult},
     },
 };
 use futures::future::join_all;
 use hash32::{Hasher, Murmur3Hasher};
 use iggy_common::IggyError;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
 use std::hash::Hasher as _;
 use tracing::{error, info, warn};
 
@@ -138,48 +137,48 @@ impl IggyShard {
     }
 
     pub fn find_shard(&self, namespace: &IggyNamespace) -> 
Option<&ShardConnector<ShardFrame>> {
-        self.shards_table.get(namespace).map(|shard_id| {
+        self.shards_table.get(namespace).map(|location| {
             self.shards
                 .iter()
-                .find(|shard| shard.id == shard_id.id())
+                .find(|shard| shard.id == *location.shard_id)
                 .expect("Shard not found in the shards table.")
         })
     }
 
-    pub fn find_shard_table_record(&self, namespace: &IggyNamespace) -> 
Option<ShardId> {
+    pub fn find_shard_table_record(&self, namespace: &IggyNamespace) -> 
Option<PartitionLocation> {
         self.shards_table.get(namespace).map(|entry| *entry)
     }
 
-    pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> 
ShardId {
+    pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> 
PartitionLocation {
         self.shards_table
             .remove(namespace)
-            .map(|(_, shard_id)| shard_id)
+            .map(|(_, location)| location)
             .expect("remove_shard_table_record: namespace not found")
     }
 
     pub fn remove_shard_table_records(
         &self,
         namespaces: &[IggyNamespace],
-    ) -> Vec<(IggyNamespace, ShardId)> {
+    ) -> Vec<(IggyNamespace, PartitionLocation)> {
         namespaces
             .iter()
             .map(|ns| {
-                let (ns, shard_id) = self.shards_table.remove(ns).unwrap();
-                (ns, shard_id)
+                let (ns, location) = self.shards_table.remove(ns).unwrap();
+                (ns, location)
             })
             .collect()
     }
 
-    pub fn insert_shard_table_record(&self, ns: IggyNamespace, shard_id: 
ShardId) {
-        self.shards_table.insert(ns, shard_id);
+    pub fn insert_shard_table_record(&self, ns: IggyNamespace, location: 
PartitionLocation) {
+        self.shards_table.insert(ns, location);
     }
 
     pub fn get_current_shard_namespaces(&self) -> Vec<IggyNamespace> {
         self.shards_table
             .iter()
             .filter_map(|entry| {
-                let (ns, shard_id) = entry.pair();
-                if shard_id.id() == self.id {
+                let (ns, location) = entry.pair();
+                if *location.shard_id == self.id {
                     Some(*ns)
                 } else {
                     None
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index f48279362..8939e87b0 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -33,9 +33,7 @@ use self::tasks::{continuous, periodic};
 use crate::{
     configs::server::ServerConfig,
     io::fs_locks::FsLocks,
-    shard::{
-        namespace::IggyNamespace, task_registry::TaskRegistry, 
transmission::frame::ShardFrame,
-    },
+    shard::{task_registry::TaskRegistry, transmission::frame::ShardFrame},
     slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
     state::file::FileState,
     streaming::{
@@ -46,6 +44,7 @@ use crate::{
 };
 use builder::IggyShardBuilder;
 use dashmap::DashMap;
+use iggy_common::sharding::{IggyNamespace, PartitionLocation};
 use iggy_common::{EncryptorKind, Identifier, IggyError};
 use std::{
     cell::{Cell, RefCell},
@@ -55,10 +54,7 @@ use std::{
     time::{Duration, Instant},
 };
 use tracing::{debug, error, info, instrument};
-use transmission::{
-    connector::{Receiver, ShardConnector, StopReceiver},
-    id::ShardId,
-};
+use transmission::connector::{Receiver, ShardConnector, StopReceiver};
 
 pub const COMPONENT: &str = "SHARD";
 pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
@@ -70,7 +66,7 @@ pub struct IggyShard {
     _version: SemanticVersion,
 
     pub(crate) streams: Streams,
-    pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>,
+    pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, 
PartitionLocation>>,
     pub(crate) state: FileState,
 
     pub(crate) fs_locks: FsLocks,
@@ -190,9 +186,9 @@ impl IggyShard {
     async fn load_segments(&self) -> Result<(), IggyError> {
         use crate::bootstrap::load_segments;
         for shard_entry in self.shards_table.iter() {
-            let (namespace, shard_id) = shard_entry.pair();
+            let (namespace, location) = shard_entry.pair();
 
-            if **shard_id == self.id {
+            if *location.shard_id == self.id {
                 let stream_id = namespace.stream_id();
                 let topic_id: usize = namespace.topic_id();
                 let partition_id = namespace.partition_id();
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index 44162e013..289587179 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -19,41 +19,6 @@
 use crate::slab::partitions;
 use iggy_common::Identifier;
 
-// Packed namespace layout (works only on 64bit platforms, but we won't 
support 32bit anyway)
-// +----------------+----------------+----------------+----------------+
-// |    stream_id   |    topic_id    |  partition_id  |     unused     |
-// |    STREAM_BITS |    TOPIC_BITS  | PARTITION_BITS |  (64 - total)  |
-// +----------------+----------------+----------------+----------------+
-
-// TODO Use consts from the `slab` module.
-pub const MAX_STREAMS: usize = 4096;
-pub const MAX_TOPICS: usize = 4096;
-pub const MAX_PARTITIONS: usize = 1_000_000;
-
-const fn bits_required(mut n: u64) -> u32 {
-    if n == 0 {
-        return 1;
-    }
-    let mut b = 0;
-    while n > 0 {
-        b += 1;
-        n >>= 1;
-    }
-    b
-}
-
-pub const STREAM_BITS: u32 = bits_required((MAX_STREAMS - 1) as u64);
-pub const TOPIC_BITS: u32 = bits_required((MAX_TOPICS - 1) as u64);
-pub const PARTITION_BITS: u32 = bits_required((MAX_PARTITIONS - 1) as u64);
-
-pub const PARTITION_SHIFT: u32 = 0;
-pub const TOPIC_SHIFT: u32 = PARTITION_SHIFT + PARTITION_BITS;
-pub const STREAM_SHIFT: u32 = TOPIC_SHIFT + TOPIC_BITS;
-
-pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1;
-pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
-pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
-
 #[derive(Debug)]
 pub struct IggyFullNamespace {
     stream: Identifier,
@@ -86,36 +51,3 @@ impl IggyFullNamespace {
         (self.stream, self.topic, self.partition)
     }
 }
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub struct IggyNamespace(u64);
-
-impl IggyNamespace {
-    #[inline]
-    pub fn inner(&self) -> u64 {
-        self.0
-    }
-
-    #[inline]
-    pub fn stream_id(&self) -> usize {
-        ((self.0 >> STREAM_SHIFT) & STREAM_MASK) as usize
-    }
-
-    #[inline]
-    pub fn topic_id(&self) -> usize {
-        ((self.0 >> TOPIC_SHIFT) & TOPIC_MASK) as usize
-    }
-
-    #[inline]
-    pub fn partition_id(&self) -> usize {
-        ((self.0 >> PARTITION_SHIFT) & PARTITION_MASK) as usize
-    }
-
-    #[inline]
-    pub fn new(stream: usize, topic: usize, partition: usize) -> Self {
-        let value = ((stream as u64) & STREAM_MASK) << STREAM_SHIFT
-            | ((topic as u64) & TOPIC_MASK) << TOPIC_SHIFT
-            | ((partition as u64) & PARTITION_MASK) << PARTITION_SHIFT;
-        Self(value)
-    }
-}
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 0c7e4853a..02e8d9d0e 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -19,7 +19,7 @@
 use super::COMPONENT;
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
 use crate::shard::IggyShard;
-use crate::shard::namespace::{IggyFullNamespace, IggyNamespace};
+use crate::shard::namespace::IggyFullNamespace;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
@@ -29,6 +29,7 @@ use crate::streaming::traits::MainOps;
 use crate::streaming::{partitions, streams, topics};
 use err_trail::ErrContext;
 use iggy_common::PooledBuffer;
+use iggy_common::sharding::IggyNamespace;
 use iggy_common::{
     BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier, IggyError,
     PollingKind, PollingStrategy,
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index e54b6678d..aacbd52c0 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -19,8 +19,6 @@
 use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::shard::calculate_shard_assignment;
-use crate::shard::namespace::IggyNamespace;
-use crate::shard::transmission::id::ShardId;
 use crate::slab::traits_ext::EntityMarker;
 use crate::slab::traits_ext::IntoComponents;
 use crate::streaming::partitions;
@@ -35,6 +33,7 @@ use crate::streaming::topics;
 use err_trail::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
+use iggy_common::sharding::{IggyNamespace, LocalIdx, PartitionLocation, 
ShardId};
 use tracing::info;
 
 impl IggyShard {
@@ -105,7 +104,10 @@ impl IggyShard {
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
             let shard_id = ShardId::new(calculate_shard_assignment(&ns, 
shards_count));
             let is_current_shard = self.id == *shard_id;
-            self.insert_shard_table_record(ns, shard_id);
+            // TODO(hubcio): LocalIdx(0) is wrong.. When IggyPartitions is 
integrated into
+            // IggyShard, this should use the actual index returned by 
IggyPartitions::insert().
+            let location = PartitionLocation::new(shard_id, LocalIdx::new(0));
+            self.insert_shard_table_record(ns, location);
 
             create_partition_file_hierarchy(
                 numeric_stream_id as usize,
@@ -204,11 +206,14 @@ impl IggyShard {
                 "create_partitions_bypass_auth: partition mismatch ID, wrong 
creation order ?!"
             );
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
id);
-            let shard_id = self.find_shard_table_record(&ns).unwrap_or_else(|| 
{
+            // TODO(hubcio): when IggyPartitions is integrated, this fallback 
path should
+            // either be removed or use proper index resolution.
+            let location = self.find_shard_table_record(&ns).unwrap_or_else(|| 
{
                 tracing::warn!("WARNING: missing shard table record for 
namespace: {:?}, in the event handler for `CreatedPartitions` event.", ns);
-                ShardId::new(calculate_shard_assignment(&ns, shards_count))
+                let shard_id = ShardId::new(calculate_shard_assignment(&ns, 
shards_count));
+                PartitionLocation::new(shard_id, LocalIdx::new(0))
             });
-            if self.id == *shard_id {
+            if self.id == *location.shard_id {
                 self.init_log(stream_id, topic_id, id).await?;
             }
         }
diff --git a/core/server/src/shard/transmission/id.rs 
b/core/server/src/shard/transmission/id.rs
deleted file mode 100644
index ad16ff9ad..000000000
--- a/core/server/src/shard/transmission/id.rs
+++ /dev/null
@@ -1,44 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::ops::Deref;
-
-// TODO: Maybe pad to cache line size?
-#[derive(Debug, Clone, Copy, Eq, PartialEq)]
-pub struct ShardId {
-    id: u16,
-}
-
-impl ShardId {
-    pub fn new(id: u16) -> Self {
-        Self { id }
-    }
-
-    pub fn id(&self) -> u16 {
-        self.id
-    }
-}
-
-impl Deref for ShardId {
-    type Target = u16;
-
-    fn deref(&self) -> &Self::Target {
-        &self.id
-    }
-}
diff --git a/core/server/src/shard/transmission/mod.rs 
b/core/server/src/shard/transmission/mod.rs
index 400c93652..107bdcbd9 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -19,5 +19,4 @@
 pub mod connector;
 pub mod event;
 pub mod frame;
-pub mod id;
 pub mod message;


Reply via email to