This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch partitions-cluster
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 8acd5827972acdc4fc31a84c5aa19fbfad35fb24
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Dec 23 10:59:10 2025 +0100

    xd
---
 Cargo.lock                                         |   7 +
 Cargo.toml                                         |   1 +
 DEPENDENCIES.md                                    |   1 +
 core/common/src/lib.rs                             |   1 +
 core/common/src/sharding/local_idx.rs              |  49 ++++++
 core/common/src/sharding/mod.rs                    |  29 ++++
 .../src/shard => common/src/sharding}/namespace.rs |  78 +++------
 core/common/src/sharding/shard_id.rs               |  46 +++++
 core/common/src/sharding/shard_location.rs         |  34 ++++
 core/partitions/Cargo.toml                         |  31 ++++
 core/partitions/src/iggy_partition.rs              |  32 ++++
 core/partitions/src/iggy_partitions.rs             | 193 +++++++++++++++++++++
 core/partitions/src/lib.rs                         |  24 +++
 core/partitions/src/traits.rs                      | 143 +++++++++++++++
 core/server/src/main.rs                            |   7 +-
 core/server/src/shard/builder.rs                   |   7 +-
 core/server/src/shard/communication.rs             |  26 +--
 core/server/src/shard/mod.rs                       |  12 +-
 core/server/src/shard/namespace.rs                 |  74 +-------
 core/server/src/shard/system/partitions.rs         |  12 +-
 core/server/src/shard/transmission/id.rs           |  28 +--
 21 files changed, 656 insertions(+), 179 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0f671da6d..fbc85ac45 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6480,6 +6480,13 @@ dependencies = [
  "syn 2.0.111",
 ]
 
+[[package]]
+name = "partitions"
+version = "0.1.0"
+dependencies = [
+ "iggy_common",
+]
+
 [[package]]
 name = "passterm"
 version = "2.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 2ccb4b1ea..0950ec44d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -44,6 +44,7 @@ members = [
     "core/journal",
     "core/message_bus",
     "core/metadata",
+    "core/partitions",
     "core/sdk",
     "core/server",
     "core/tools",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 8b06b77ac..74b5a2ae6 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -564,6 +564,7 @@ parking_lot_core: 0.9.12, "Apache-2.0 OR MIT",
 parquet: 55.2.0, "Apache-2.0",
 parse-display: 0.9.1, "Apache-2.0 OR MIT",
 parse-display-derive: 0.9.1, "Apache-2.0 OR MIT",
+partitions: 0.1.0, "Apache-2.0",
 passterm: 2.0.1, "BSD-3-Clause",
 password-hash: 0.5.0, "Apache-2.0 OR MIT",
 paste: 1.0.15, "Apache-2.0 OR MIT",
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 1adff1f9f..af04602ce 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -21,6 +21,7 @@ mod commands;
 mod configs;
 mod error;
 mod sender;
+pub mod sharding;
 mod traits;
 mod types;
 mod utils;
diff --git a/core/common/src/sharding/local_idx.rs 
b/core/common/src/sharding/local_idx.rs
new file mode 100644
index 000000000..43f22dfab
--- /dev/null
+++ b/core/common/src/sharding/local_idx.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Deref;
+
+/// Index of a partition within a shard's local partition collection.
+///
+/// This is NOT the same as partition_id. A shard may have partitions with
+/// IDs [0, 2, 4] but their local indices would be [0, 1, 2].
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct LocalIdx(usize);
+
+impl LocalIdx {
+    pub fn new(idx: usize) -> Self {
+        Self(idx)
+    }
+
+    pub fn idx(&self) -> usize {
+        self.0
+    }
+}
+
+impl Deref for LocalIdx {
+    type Target = usize;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl From<usize> for LocalIdx {
+    fn from(idx: usize) -> Self {
+        Self(idx)
+    }
+}
diff --git a/core/common/src/sharding/mod.rs b/core/common/src/sharding/mod.rs
new file mode 100644
index 000000000..0f1ec9fec
--- /dev/null
+++ b/core/common/src/sharding/mod.rs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod local_idx;
+mod namespace;
+mod shard_id;
+mod shard_location;
+
+pub use local_idx::LocalIdx;
+pub use namespace::{
+    IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS, 
PARTITION_MASK,
+    PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, 
TOPIC_MASK, TOPIC_SHIFT,
+};
+pub use shard_id::ShardId;
+pub use shard_location::ShardLocation;
diff --git a/core/server/src/shard/namespace.rs 
b/core/common/src/sharding/namespace.rs
similarity index 53%
copy from core/server/src/shard/namespace.rs
copy to core/common/src/sharding/namespace.rs
index 44162e013..4819f6673 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/common/src/sharding/namespace.rs
@@ -1,31 +1,26 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::slab::partitions;
-use iggy_common::Identifier;
-
-// Packed namespace layout (works only on 64bit platforms, but we won't 
support 32bit anyway)
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Packed namespace layout (works only on 64bit platforms)
 // +----------------+----------------+----------------+----------------+
 // |    stream_id   |    topic_id    |  partition_id  |     unused     |
 // |    STREAM_BITS |    TOPIC_BITS  | PARTITION_BITS |  (64 - total)  |
 // +----------------+----------------+----------------+----------------+
 
-// TODO Use consts from the `slab` module.
 pub const MAX_STREAMS: usize = 4096;
 pub const MAX_TOPICS: usize = 4096;
 pub const MAX_PARTITIONS: usize = 1_000_000;
@@ -54,39 +49,10 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 
1;
 pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
 pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
 
-#[derive(Debug)]
-pub struct IggyFullNamespace {
-    stream: Identifier,
-    topic: Identifier,
-    partition: partitions::ContainerId,
-}
-
-impl IggyFullNamespace {
-    pub fn new(stream: Identifier, topic: Identifier, partition: 
partitions::ContainerId) -> Self {
-        Self {
-            stream,
-            topic,
-            partition,
-        }
-    }
-
-    pub fn stream_id(&self) -> &Identifier {
-        &self.stream
-    }
-
-    pub fn topic_id(&self) -> &Identifier {
-        &self.topic
-    }
-
-    pub fn partition_id(&self) -> partitions::ContainerId {
-        self.partition
-    }
-
-    pub fn decompose(self) -> (Identifier, Identifier, 
partitions::ContainerId) {
-        (self.stream, self.topic, self.partition)
-    }
-}
-
+/// Packed namespace identifier for shard assignment.
+///
+/// Encodes stream_id (12 bits), topic_id (12 bits), and partition_id (20 bits)
+/// into a single u64 for efficient hashing and routing.
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub struct IggyNamespace(u64);
 
diff --git a/core/common/src/sharding/shard_id.rs 
b/core/common/src/sharding/shard_id.rs
new file mode 100644
index 000000000..ec546c370
--- /dev/null
+++ b/core/common/src/sharding/shard_id.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Deref;
+
+/// Identifier for a shard in the cluster.
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
+pub struct ShardId(u16);
+
+impl ShardId {
+    pub fn new(id: u16) -> Self {
+        Self(id)
+    }
+
+    pub fn id(&self) -> u16 {
+        self.0
+    }
+}
+
+impl Deref for ShardId {
+    type Target = u16;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl From<u16> for ShardId {
+    fn from(id: u16) -> Self {
+        Self(id)
+    }
+}
diff --git a/core/common/src/sharding/shard_location.rs 
b/core/common/src/sharding/shard_location.rs
new file mode 100644
index 000000000..d20c1929d
--- /dev/null
+++ b/core/common/src/sharding/shard_location.rs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{LocalIdx, ShardId};
+
+/// Location of a partition: which shard owns it and its local index within 
that shard.
+#[derive(Debug, Clone, Copy)]
+pub struct ShardLocation {
+    pub shard_id: ShardId,
+    pub local_idx: LocalIdx,
+}
+
+impl ShardLocation {
+    pub fn new(shard_id: ShardId, local_idx: LocalIdx) -> Self {
+        Self {
+            shard_id,
+            local_idx,
+        }
+    }
+}
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
new file mode 100644
index 000000000..2a343ba43
--- /dev/null
+++ b/core/partitions/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "partitions"
+version = "0.1.0"
+description = "Iggy partitions abstraction for clustering support"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming", "partitions"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+iggy_common = { path = "../common" }
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
new file mode 100644
index 000000000..cfe95f232
--- /dev/null
+++ b/core/partitions/src/iggy_partition.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Placeholder struct for partition.
+///
+/// Intentionally empty for now. The actual partition implementation
+/// will be added when integrating with storage. This serves as a marker type
+/// for the trait implementation.
+#[derive(Debug, Default)]
+pub struct IggyPartition {
+    // Will be populated when integrating with actual storage
+}
+
+impl IggyPartition {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
new file mode 100644
index 000000000..c2085858e
--- /dev/null
+++ b/core/partitions/src/iggy_partitions.rs
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+    IggyPartition, Partitions, PollMetadata, PollingArgs, PollingConsumer, 
SendMessagesResult,
+};
+use iggy_common::sharding::{IggyNamespace, LocalIdx, ShardId};
+use iggy_common::{ConsumerOffsetInfo, IggyError};
+use std::collections::HashMap;
+
+/// Per-shard collection of all partitions.
+///
+/// This struct manages ALL partitions assigned to a single shard, regardless
+/// of which stream/topic they belong to. This flat structure enables:
+/// - Direct O(1) access via LocalIdx
+/// - Efficient iteration over all partitions on a shard
+/// - Clean separation from metadata hierarchy (streams/topics)
+///
+/// Note: The partition_id within IggyNamespace may NOT equal the Vec index.
+/// For example, shard 0 might have partition_ids [0, 2, 4] while shard 1
+/// has partition_ids [1, 3, 5]. The `LocalIdx` provides the actual index
+/// into the `partitions` Vec.
+pub struct IggyPartitions {
+    shard_id: ShardId,
+    partitions: Vec<IggyPartition>,
+    namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
+}
+
+impl IggyPartitions {
+    pub fn new(shard_id: ShardId) -> Self {
+        Self {
+            shard_id,
+            partitions: Vec::new(),
+            namespace_to_local: HashMap::new(),
+        }
+    }
+
+    pub fn with_capacity(shard_id: ShardId, capacity: usize) -> Self {
+        Self {
+            shard_id,
+            partitions: Vec::with_capacity(capacity),
+            namespace_to_local: HashMap::with_capacity(capacity),
+        }
+    }
+
+    pub fn shard_id(&self) -> ShardId {
+        self.shard_id
+    }
+
+    pub fn len(&self) -> usize {
+        self.partitions.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.partitions.is_empty()
+    }
+
+    /// Get partition by local index.
+    pub fn get(&self, local_idx: LocalIdx) -> Option<&IggyPartition> {
+        self.partitions.get(*local_idx)
+    }
+
+    /// Get mutable partition by local index.
+    pub fn get_mut(&mut self, local_idx: LocalIdx) -> Option<&mut 
IggyPartition> {
+        self.partitions.get_mut(*local_idx)
+    }
+
+    /// Lookup local index by namespace.
+    pub fn local_idx(&self, namespace: &IggyNamespace) -> Option<LocalIdx> {
+        self.namespace_to_local.get(namespace).copied()
+    }
+
+    /// Insert a new partition and return its local index.
+    pub fn insert(&mut self, namespace: IggyNamespace, partition: 
IggyPartition) -> LocalIdx {
+        let local_idx = LocalIdx::new(self.partitions.len());
+        self.partitions.push(partition);
+        self.namespace_to_local.insert(namespace, local_idx);
+        local_idx
+    }
+
+    /// Remove a partition by namespace. Returns the removed partition if 
found.
+    pub fn remove(&mut self, namespace: &IggyNamespace) -> 
Option<IggyPartition> {
+        let local_idx = self.namespace_to_local.remove(namespace)?;
+        let idx = *local_idx;
+
+        if idx >= self.partitions.len() {
+            return None;
+        }
+
+        // Swap-remove for O(1) deletion
+        let partition = self.partitions.swap_remove(idx);
+
+        // If we swapped an element, update its index in the map
+        if idx < self.partitions.len() {
+            // Find the namespace that was at the last position (now at idx)
+            for (_ns, lidx) in self.namespace_to_local.iter_mut() {
+                if **lidx == self.partitions.len() {
+                    *lidx = LocalIdx::new(idx);
+                    break;
+                }
+            }
+        }
+
+        Some(partition)
+    }
+}
+
+/// Placeholder implementation - actual logic will be added when integrating 
with storage.
+impl Partitions for IggyPartitions {
+    type MessageBatch = ();
+    type MessageBatchSet = ();
+
+    async fn poll_messages(
+        &self,
+        _namespace: &IggyNamespace,
+        _local_idx: LocalIdx,
+        _consumer: PollingConsumer,
+        _args: PollingArgs,
+    ) -> Result<(PollMetadata, Self::MessageBatchSet), IggyError> {
+        Ok((PollMetadata::new(0, 0), ()))
+    }
+
+    async fn send_messages(
+        &self,
+        _namespace: &IggyNamespace,
+        _local_idx: LocalIdx,
+        _batch: Self::MessageBatch,
+    ) -> Result<SendMessagesResult, IggyError> {
+        Ok(SendMessagesResult { messages_count: 0 })
+    }
+
+    async fn create_partition(&self, _namespace: &IggyNamespace) -> 
Result<LocalIdx, IggyError> {
+        Err(IggyError::FeatureUnavailable)
+    }
+
+    async fn delete_partitions(
+        &self,
+        _namespaces: &[IggyNamespace],
+    ) -> Result<Vec<LocalIdx>, IggyError> {
+        Err(IggyError::FeatureUnavailable)
+    }
+
+    async fn get_consumer_offset(
+        &self,
+        _namespace: &IggyNamespace,
+        _local_idx: LocalIdx,
+        _consumer: PollingConsumer,
+    ) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
+        Ok(None)
+    }
+
+    async fn store_consumer_offset(
+        &self,
+        _namespace: &IggyNamespace,
+        _local_idx: LocalIdx,
+        _consumer: PollingConsumer,
+        _offset: u64,
+    ) -> Result<(), IggyError> {
+        Ok(())
+    }
+
+    async fn delete_consumer_offset(
+        &self,
+        _namespace: &IggyNamespace,
+        _local_idx: LocalIdx,
+        _consumer: PollingConsumer,
+    ) -> Result<(), IggyError> {
+        Ok(())
+    }
+
+    async fn flush_unsaved_buffer(
+        &self,
+        _namespace: &IggyNamespace,
+        _local_idx: LocalIdx,
+        _fsync: bool,
+    ) -> Result<(), IggyError> {
+        Ok(())
+    }
+}
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
new file mode 100644
index 000000000..2c1c13078
--- /dev/null
+++ b/core/partitions/src/lib.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod iggy_partition;
+mod iggy_partitions;
+mod traits;
+
+pub use iggy_partition::IggyPartition;
+pub use iggy_partitions::IggyPartitions;
+pub use traits::{Partitions, PollMetadata, PollingArgs, PollingConsumer, 
SendMessagesResult};
diff --git a/core/partitions/src/traits.rs b/core/partitions/src/traits.rs
new file mode 100644
index 000000000..bf6a751bc
--- /dev/null
+++ b/core/partitions/src/traits.rs
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::sharding::{IggyNamespace, LocalIdx};
+use iggy_common::{ConsumerOffsetInfo, IggyError, PollingStrategy};
+use std::future::Future;
+
+/// Arguments for polling messages from a partition.
+#[derive(Debug, Clone)]
+pub struct PollingArgs {
+    pub strategy: PollingStrategy,
+    pub count: u32,
+    pub auto_commit: bool,
+}
+
+impl PollingArgs {
+    pub fn new(strategy: PollingStrategy, count: u32, auto_commit: bool) -> 
Self {
+        Self {
+            strategy,
+            count,
+            auto_commit,
+        }
+    }
+}
+
+/// Metadata returned from a poll operation.
+#[derive(Debug, Clone)]
+pub struct PollMetadata {
+    pub partition_id: u32,
+    pub current_offset: u64,
+}
+
+impl PollMetadata {
+    pub fn new(partition_id: u32, current_offset: u64) -> Self {
+        Self {
+            partition_id,
+            current_offset,
+        }
+    }
+}
+
+/// Result of sending messages.
+#[derive(Debug)]
+pub struct SendMessagesResult {
+    pub messages_count: u32,
+}
+
+/// Consumer identification for offset operations.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PollingConsumer {
+    /// Regular consumer with (consumer_id, partition_id)
+    Consumer(usize, usize),
+    /// Consumer group with (group_id, member_id)
+    ConsumerGroup(usize, usize),
+}
+
+/// The core abstraction for partition operations in clustering.
+///
+/// This trait defines the data-plane operations for partitions that
+/// need to be coordinated across a cluster using viewstamped replication.
+/// Implementations can vary between single-node and clustered deployments.
+pub trait Partitions {
+    /// Message batch type for sending messages.
+    type MessageBatch;
+    /// Message batch set type for poll results.
+    type MessageBatchSet;
+
+    /// Poll messages from a partition.
+    fn poll_messages(
+        &self,
+        namespace: &IggyNamespace,
+        local_idx: LocalIdx,
+        consumer: PollingConsumer,
+        args: PollingArgs,
+    ) -> impl Future<Output = Result<(PollMetadata, Self::MessageBatchSet), 
IggyError>> + Send;
+
+    /// Send/append messages to a partition.
+    fn send_messages(
+        &self,
+        namespace: &IggyNamespace,
+        local_idx: LocalIdx,
+        batch: Self::MessageBatch,
+    ) -> impl Future<Output = Result<SendMessagesResult, IggyError>> + Send;
+
+    /// Create a new partition.
+    fn create_partition(
+        &self,
+        namespace: &IggyNamespace,
+    ) -> impl Future<Output = Result<LocalIdx, IggyError>> + Send;
+
+    /// Delete partitions from the collection.
+    fn delete_partitions(
+        &self,
+        namespaces: &[IggyNamespace],
+    ) -> impl Future<Output = Result<Vec<LocalIdx>, IggyError>> + Send;
+
+    /// Get the stored offset for a consumer on a partition.
+    fn get_consumer_offset(
+        &self,
+        namespace: &IggyNamespace,
+        local_idx: LocalIdx,
+        consumer: PollingConsumer,
+    ) -> impl Future<Output = Result<Option<ConsumerOffsetInfo>, IggyError>> + 
Send;
+
+    /// Store/update the offset for a consumer on a partition.
+    fn store_consumer_offset(
+        &self,
+        namespace: &IggyNamespace,
+        local_idx: LocalIdx,
+        consumer: PollingConsumer,
+        offset: u64,
+    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+
+    /// Delete the stored offset for a consumer on a partition.
+    fn delete_consumer_offset(
+        &self,
+        namespace: &IggyNamespace,
+        local_idx: LocalIdx,
+        consumer: PollingConsumer,
+    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+
+    /// Flush unsaved messages to disk.
+    fn flush_unsaved_buffer(
+        &self,
+        namespace: &IggyNamespace,
+        local_idx: LocalIdx,
+        fsync: bool,
+    ) -> impl Future<Output = Result<(), IggyError>> + Send;
+}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 23e7c515d..869f0b9a7 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -23,6 +23,7 @@ use dashmap::DashMap;
 use dotenvy::dotenv;
 use err_trail::ErrContext;
 use figlet_rs::FIGfont;
+use iggy_common::sharding::{LocalIdx, ShardLocation};
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, MemoryPool};
 use server::SEMANTIC_VERSION;
 use server::args::Args;
@@ -314,7 +315,7 @@ fn main() -> Result<(), ServerError> {
         // Shared resources bootstrap.
         let shards_table = 
Box::new(DashMap::with_capacity(SHARDS_TABLE_CAPACITY));
         let shards_table = Box::leak(shards_table);
-        let shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>> = 
shards_table.into();
+        let shards_table: EternalPtr<DashMap<IggyNamespace, ShardLocation>> = 
shards_table.into();
 
         let client_manager = Box::new(DashMap::new());
         let client_manager = Box::leak(client_manager);
@@ -338,7 +339,9 @@ fn main() -> Result<(), ServerError> {
                                     &ns,
                                     shard_assignment.len() as u32,
                                 ));
-                                shards_table.insert(ns, shard_id);
+                                // LocalIdx is a placeholder until 
IggyPartitions integration
+                                let location = ShardLocation::new(shard_id, 
LocalIdx::new(0));
+                                shards_table.insert(ns, location);
                             }
                         });
                     }
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 188452fe8..62032b32f 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -18,7 +18,7 @@
 
 use super::{
     IggyShard, TaskRegistry, transmission::connector::ShardConnector,
-    transmission::frame::ShardFrame, transmission::id::ShardId,
+    transmission::frame::ShardFrame,
 };
 use crate::{
     configs::server::ServerConfig,
@@ -33,13 +33,14 @@ use crate::{
 };
 use dashmap::DashMap;
 use iggy_common::EncryptorKind;
+use iggy_common::sharding::ShardLocation;
 use std::{cell::Cell, rc::Rc, sync::atomic::AtomicBool};
 
 #[derive(Default)]
 pub struct IggyShardBuilder {
     id: Option<u16>,
     streams: Option<Streams>,
-    shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>,
+    shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardLocation>>>,
     state: Option<FileState>,
     users: Option<Users>,
     client_manager: Option<ClientManager>,
@@ -69,7 +70,7 @@ impl IggyShardBuilder {
 
     pub fn shards_table(
         mut self,
-        shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>,
+        shards_table: EternalPtr<DashMap<IggyNamespace, ShardLocation>>,
     ) -> Self {
         self.shards_table = Some(shards_table);
         self
diff --git a/core/server/src/shard/communication.rs 
b/core/server/src/shard/communication.rs
index 59dc11183..e3c780c8f 100644
--- a/core/server/src/shard/communication.rs
+++ b/core/server/src/shard/communication.rs
@@ -22,13 +22,13 @@ use crate::shard::{
         connector::ShardConnector,
         event::ShardEvent,
         frame::ShardFrame,
-        id::ShardId,
         message::{ShardMessage, ShardSendRequestResult},
     },
 };
 use futures::future::join_all;
 use hash32::{Hasher, Murmur3Hasher};
 use iggy_common::IggyError;
+use iggy_common::sharding::ShardLocation;
 use std::hash::Hasher as _;
 use tracing::{error, info, warn};
 
@@ -138,48 +138,48 @@ impl IggyShard {
     }
 
     pub fn find_shard(&self, namespace: &IggyNamespace) -> 
Option<&ShardConnector<ShardFrame>> {
-        self.shards_table.get(namespace).map(|shard_id| {
+        self.shards_table.get(namespace).map(|location| {
             self.shards
                 .iter()
-                .find(|shard| shard.id == shard_id.id())
+                .find(|shard| shard.id == *location.shard_id)
                 .expect("Shard not found in the shards table.")
         })
     }
 
-    pub fn find_shard_table_record(&self, namespace: &IggyNamespace) -> 
Option<ShardId> {
+    pub fn find_shard_table_record(&self, namespace: &IggyNamespace) -> 
Option<ShardLocation> {
         self.shards_table.get(namespace).map(|entry| *entry)
     }
 
-    pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> 
ShardId {
+    pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> 
ShardLocation {
         self.shards_table
             .remove(namespace)
-            .map(|(_, shard_id)| shard_id)
+            .map(|(_, location)| location)
             .expect("remove_shard_table_record: namespace not found")
     }
 
     pub fn remove_shard_table_records(
         &self,
         namespaces: &[IggyNamespace],
-    ) -> Vec<(IggyNamespace, ShardId)> {
+    ) -> Vec<(IggyNamespace, ShardLocation)> {
         namespaces
             .iter()
             .map(|ns| {
-                let (ns, shard_id) = self.shards_table.remove(ns).unwrap();
-                (ns, shard_id)
+                let (ns, location) = self.shards_table.remove(ns).unwrap();
+                (ns, location)
             })
             .collect()
     }
 
-    pub fn insert_shard_table_record(&self, ns: IggyNamespace, shard_id: 
ShardId) {
-        self.shards_table.insert(ns, shard_id);
+    pub fn insert_shard_table_record(&self, ns: IggyNamespace, location: 
ShardLocation) {
+        self.shards_table.insert(ns, location);
     }
 
     pub fn get_current_shard_namespaces(&self) -> Vec<IggyNamespace> {
         self.shards_table
             .iter()
             .filter_map(|entry| {
-                let (ns, shard_id) = entry.pair();
-                if shard_id.id() == self.id {
+                let (ns, location) = entry.pair();
+                if *location.shard_id == self.id {
                     Some(*ns)
                 } else {
                     None
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index f48279362..178fa6308 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -46,6 +46,7 @@ use crate::{
 };
 use builder::IggyShardBuilder;
 use dashmap::DashMap;
+use iggy_common::sharding::ShardLocation;
 use iggy_common::{EncryptorKind, Identifier, IggyError};
 use std::{
     cell::{Cell, RefCell},
@@ -55,10 +56,7 @@ use std::{
     time::{Duration, Instant},
 };
 use tracing::{debug, error, info, instrument};
-use transmission::{
-    connector::{Receiver, ShardConnector, StopReceiver},
-    id::ShardId,
-};
+use transmission::connector::{Receiver, ShardConnector, StopReceiver};
 
 pub const COMPONENT: &str = "SHARD";
 pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
@@ -70,7 +68,7 @@ pub struct IggyShard {
     _version: SemanticVersion,
 
     pub(crate) streams: Streams,
-    pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>,
+    pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardLocation>>,
     pub(crate) state: FileState,
 
     pub(crate) fs_locks: FsLocks,
@@ -190,9 +188,9 @@ impl IggyShard {
     async fn load_segments(&self) -> Result<(), IggyError> {
         use crate::bootstrap::load_segments;
         for shard_entry in self.shards_table.iter() {
-            let (namespace, shard_id) = shard_entry.pair();
+            let (namespace, location) = shard_entry.pair();
 
-            if **shard_id == self.id {
+            if *location.shard_id == self.id {
                 let stream_id = namespace.stream_id();
                 let topic_id: usize = namespace.topic_id();
                 let partition_id = namespace.partition_id();
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index 44162e013..5b9dea4aa 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -16,44 +16,15 @@
  * under the License.
  */
 
+// Re-export from common for backward compatibility
+pub use iggy_common::sharding::{
+    IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS, 
PARTITION_MASK,
+    PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, 
TOPIC_MASK, TOPIC_SHIFT,
+};
+
 use crate::slab::partitions;
 use iggy_common::Identifier;
 
-// Packed namespace layout (works only on 64bit platforms, but we won't 
support 32bit anyway)
-// +----------------+----------------+----------------+----------------+
-// |    stream_id   |    topic_id    |  partition_id  |     unused     |
-// |    STREAM_BITS |    TOPIC_BITS  | PARTITION_BITS |  (64 - total)  |
-// +----------------+----------------+----------------+----------------+
-
-// TODO Use consts from the `slab` module.
-pub const MAX_STREAMS: usize = 4096;
-pub const MAX_TOPICS: usize = 4096;
-pub const MAX_PARTITIONS: usize = 1_000_000;
-
-const fn bits_required(mut n: u64) -> u32 {
-    if n == 0 {
-        return 1;
-    }
-    let mut b = 0;
-    while n > 0 {
-        b += 1;
-        n >>= 1;
-    }
-    b
-}
-
-pub const STREAM_BITS: u32 = bits_required((MAX_STREAMS - 1) as u64);
-pub const TOPIC_BITS: u32 = bits_required((MAX_TOPICS - 1) as u64);
-pub const PARTITION_BITS: u32 = bits_required((MAX_PARTITIONS - 1) as u64);
-
-pub const PARTITION_SHIFT: u32 = 0;
-pub const TOPIC_SHIFT: u32 = PARTITION_SHIFT + PARTITION_BITS;
-pub const STREAM_SHIFT: u32 = TOPIC_SHIFT + TOPIC_BITS;
-
-pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1;
-pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
-pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
-
 #[derive(Debug)]
 pub struct IggyFullNamespace {
     stream: Identifier,
@@ -86,36 +57,3 @@ impl IggyFullNamespace {
         (self.stream, self.topic, self.partition)
     }
 }
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub struct IggyNamespace(u64);
-
-impl IggyNamespace {
-    #[inline]
-    pub fn inner(&self) -> u64 {
-        self.0
-    }
-
-    #[inline]
-    pub fn stream_id(&self) -> usize {
-        ((self.0 >> STREAM_SHIFT) & STREAM_MASK) as usize
-    }
-
-    #[inline]
-    pub fn topic_id(&self) -> usize {
-        ((self.0 >> TOPIC_SHIFT) & TOPIC_MASK) as usize
-    }
-
-    #[inline]
-    pub fn partition_id(&self) -> usize {
-        ((self.0 >> PARTITION_SHIFT) & PARTITION_MASK) as usize
-    }
-
-    #[inline]
-    pub fn new(stream: usize, topic: usize, partition: usize) -> Self {
-        let value = ((stream as u64) & STREAM_MASK) << STREAM_SHIFT
-            | ((topic as u64) & TOPIC_MASK) << TOPIC_SHIFT
-            | ((partition as u64) & PARTITION_MASK) << PARTITION_SHIFT;
-        Self(value)
-    }
-}
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index b95847303..7832a3975 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -35,6 +35,7 @@ use crate::streaming::topics;
 use err_trail::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
+use iggy_common::sharding::{LocalIdx, ShardLocation};
 use tracing::info;
 
 impl IggyShard {
@@ -105,7 +106,9 @@ impl IggyShard {
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
             let shard_id = ShardId::new(calculate_shard_assignment(&ns, 
shards_count));
             let is_current_shard = self.id == *shard_id;
-            self.insert_shard_table_record(ns, shard_id);
+            // LocalIdx is a placeholder until IggyPartitions integration
+            let location = ShardLocation::new(shard_id, LocalIdx::new(0));
+            self.insert_shard_table_record(ns, location);
 
             create_partition_file_hierarchy(
                 numeric_stream_id as usize,
@@ -204,11 +207,12 @@ impl IggyShard {
                 "create_partitions_bypass_auth: partition mismatch ID, wrong 
creation order ?!"
             );
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
id);
-            let shard_id = self.find_shard_table_record(&ns).unwrap_or_else(|| 
{
+            let location = self.find_shard_table_record(&ns).unwrap_or_else(|| 
{
                 tracing::warn!("WARNING: missing shard table record for 
namespace: {:?}, in the event handler for `CreatedPartitions` event.", ns);
-                ShardId::new(calculate_shard_assignment(&ns, shards_count))
+                let shard_id = ShardId::new(calculate_shard_assignment(&ns, 
shards_count));
+                ShardLocation::new(shard_id, LocalIdx::new(0))
             });
-            if self.id == *shard_id {
+            if self.id == *location.shard_id {
                 self.init_log(stream_id, topic_id, id).await?;
             }
         }
diff --git a/core/server/src/shard/transmission/id.rs 
b/core/server/src/shard/transmission/id.rs
index ad16ff9ad..86d4443db 100644
--- a/core/server/src/shard/transmission/id.rs
+++ b/core/server/src/shard/transmission/id.rs
@@ -1,5 +1,4 @@
 /* Licensed to the Apache Software Foundation (ASF) under one
- * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -17,28 +16,5 @@
  * under the License.
  */
 
-use std::ops::Deref;
-
-// TODO: Maybe pad to cache line size?
-#[derive(Debug, Clone, Copy, Eq, PartialEq)]
-pub struct ShardId {
-    id: u16,
-}
-
-impl ShardId {
-    pub fn new(id: u16) -> Self {
-        Self { id }
-    }
-
-    pub fn id(&self) -> u16 {
-        self.id
-    }
-}
-
-impl Deref for ShardId {
-    type Target = u16;
-
-    fn deref(&self) -> &Self::Target {
-        &self.id
-    }
-}
+// Re-export from common for backward compatibility
+pub use iggy_common::sharding::ShardId;


Reply via email to