This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch partitions-cluster in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0a8b3dea2795ed79a71367dc0f2f034b1daf3957 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Dec 23 10:59:10 2025 +0100 feat(partitions): add partitions abstraction for clustering Introduce foundation for cluster-aware partition management: - Create `Partitions` trait (poll/send messages, consumer offsets, CRUD) - Add `IggyPartitions` with flat Vec storage and O(1) namespace lookup - Move sharding types to `iggy_common::sharding` (IggyNamespace, ShardId) - Add `LocalIdx` and `ShardLocation` for non-contiguous partition IDs - Update `shards_table`: IggyNamespace -> ShardLocation --- Cargo.lock | 7 + Cargo.toml | 1 + DEPENDENCIES.md | 1 + core/common/src/lib.rs | 1 + core/common/src/sharding/local_idx.rs | 49 ++++++ core/common/src/sharding/mod.rs | 29 +++ .../src/shard => common/src/sharding}/namespace.rs | 78 +++----- core/common/src/sharding/shard_id.rs | 46 +++++ core/common/src/sharding/shard_location.rs | 34 ++++ core/partitions/Cargo.toml | 31 ++++ core/partitions/src/iggy_partition.rs | 32 ++++ core/partitions/src/iggy_partitions.rs | 196 +++++++++++++++++++++ core/partitions/src/lib.rs | 24 +++ core/partitions/src/traits.rs | 144 +++++++++++++++ core/server/src/main.rs | 7 +- core/server/src/shard/builder.rs | 7 +- core/server/src/shard/communication.rs | 26 +-- core/server/src/shard/mod.rs | 12 +- core/server/src/shard/namespace.rs | 74 +------- core/server/src/shard/system/partitions.rs | 15 +- core/server/src/shard/transmission/id.rs | 28 +-- 21 files changed, 663 insertions(+), 179 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f671da6d..fbc85ac45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6480,6 +6480,13 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "partitions" +version = "0.1.0" +dependencies = [ + "iggy_common", +] + [[package]] name = "passterm" version = "2.0.1" diff --git a/Cargo.toml b/Cargo.toml index 2ccb4b1ea..0950ec44d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ members = [ "core/journal", "core/message_bus", "core/metadata", + "core/partitions", "core/sdk", "core/server", "core/tools", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 8b06b77ac..74b5a2ae6 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -564,6 +564,7 @@ parking_lot_core: 0.9.12, "Apache-2.0 OR MIT", parquet: 55.2.0, "Apache-2.0", parse-display: 0.9.1, "Apache-2.0 OR MIT", parse-display-derive: 0.9.1, "Apache-2.0 OR MIT", +partitions: 0.1.0, "Apache-2.0", passterm: 2.0.1, "BSD-3-Clause", password-hash: 0.5.0, "Apache-2.0 OR MIT", paste: 1.0.15, "Apache-2.0 OR MIT", diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 1adff1f9f..af04602ce 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -21,6 +21,7 @@ mod commands; mod configs; mod error; mod sender; +pub mod sharding; mod traits; mod types; mod utils; diff --git a/core/common/src/sharding/local_idx.rs b/core/common/src/sharding/local_idx.rs new file mode 100644 index 000000000..43f22dfab --- /dev/null +++ b/core/common/src/sharding/local_idx.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Deref; + +/// Index of a partition within a shard's local partition collection. +/// +/// This is NOT the same as partition_id. A shard may have partitions with +/// IDs [0, 2, 4] but their local indices would be [0, 1, 2]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct LocalIdx(usize); + +impl LocalIdx { + pub fn new(idx: usize) -> Self { + Self(idx) + } + + pub fn idx(&self) -> usize { + self.0 + } +} + +impl Deref for LocalIdx { + type Target = usize; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From<usize> for LocalIdx { + fn from(idx: usize) -> Self { + Self(idx) + } +} diff --git a/core/common/src/sharding/mod.rs b/core/common/src/sharding/mod.rs new file mode 100644 index 000000000..0f1ec9fec --- /dev/null +++ b/core/common/src/sharding/mod.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod local_idx; +mod namespace; +mod shard_id; +mod shard_location; + +pub use local_idx::LocalIdx; +pub use namespace::{ + IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS, PARTITION_MASK, + PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, TOPIC_MASK, TOPIC_SHIFT, +}; +pub use shard_id::ShardId; +pub use shard_location::ShardLocation; diff --git a/core/server/src/shard/namespace.rs b/core/common/src/sharding/namespace.rs similarity index 53% copy from core/server/src/shard/namespace.rs copy to core/common/src/sharding/namespace.rs index 44162e013..4819f6673 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/common/src/sharding/namespace.rs @@ -1,31 +1,26 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use crate::slab::partitions; -use iggy_common::Identifier; - -// Packed namespace layout (works only on 64bit platforms, but we won't support 32bit anyway) +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Packed namespace layout (works only on 64bit platforms) // +----------------+----------------+----------------+----------------+ // | stream_id | topic_id | partition_id | unused | // | STREAM_BITS | TOPIC_BITS | PARTITION_BITS | (64 - total) | // +----------------+----------------+----------------+----------------+ -// TODO Use consts from the `slab` module. pub const MAX_STREAMS: usize = 4096; pub const MAX_TOPICS: usize = 4096; pub const MAX_PARTITIONS: usize = 1_000_000; @@ -54,39 +49,10 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1; pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1; pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1; -#[derive(Debug)] -pub struct IggyFullNamespace { - stream: Identifier, - topic: Identifier, - partition: partitions::ContainerId, -} - -impl IggyFullNamespace { - pub fn new(stream: Identifier, topic: Identifier, partition: partitions::ContainerId) -> Self { - Self { - stream, - topic, - partition, - } - } - - pub fn stream_id(&self) -> &Identifier { - &self.stream - } - - pub fn topic_id(&self) -> &Identifier { - &self.topic - } - - pub fn partition_id(&self) -> partitions::ContainerId { - self.partition - } - - pub fn decompose(self) -> (Identifier, Identifier, partitions::ContainerId) { - (self.stream, self.topic, self.partition) - } -} - +/// Packed namespace identifier for shard assignment. +/// +/// Encodes stream_id (12 bits), topic_id (12 bits), and partition_id (20 bits) +/// into a single u64 for efficient hashing and routing. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct IggyNamespace(u64); diff --git a/core/common/src/sharding/shard_id.rs b/core/common/src/sharding/shard_id.rs new file mode 100644 index 000000000..ec546c370 --- /dev/null +++ b/core/common/src/sharding/shard_id.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Deref; + +/// Identifier for a shard in the cluster. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +pub struct ShardId(u16); + +impl ShardId { + pub fn new(id: u16) -> Self { + Self(id) + } + + pub fn id(&self) -> u16 { + self.0 + } +} + +impl Deref for ShardId { + type Target = u16; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From<u16> for ShardId { + fn from(id: u16) -> Self { + Self(id) + } +} diff --git a/core/common/src/sharding/shard_location.rs b/core/common/src/sharding/shard_location.rs new file mode 100644 index 000000000..d20c1929d --- /dev/null +++ b/core/common/src/sharding/shard_location.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{LocalIdx, ShardId}; + +/// Location of a partition: which shard owns it and its local index within that shard. +#[derive(Debug, Clone, Copy)] +pub struct ShardLocation { + pub shard_id: ShardId, + pub local_idx: LocalIdx, +} + +impl ShardLocation { + pub fn new(shard_id: ShardId, local_idx: LocalIdx) -> Self { + Self { + shard_id, + local_idx, + } + } +} diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml new file mode 100644 index 000000000..2a343ba43 --- /dev/null +++ b/core/partitions/Cargo.toml @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "partitions" +version = "0.1.0" +description = "Iggy partitions abstraction for clustering support" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "partitions"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[dependencies] +iggy_common = { path = "../common" } diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs new file mode 100644 index 000000000..cfe95f232 --- /dev/null +++ b/core/partitions/src/iggy_partition.rs @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Placeholder struct for partition. +/// +/// Intentionally empty for now. The actual partition implementation +/// will be added when integrating with storage. This serves as a marker type +/// for the trait implementation. +#[derive(Debug, Default)] +pub struct IggyPartition { + // Will be populated when integrating with actual storage +} + +impl IggyPartition { + pub fn new() -> Self { + Self {} + } +} diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs new file mode 100644 index 000000000..60fd27a89 --- /dev/null +++ b/core/partitions/src/iggy_partitions.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ + IggyPartition, Partitions, PollMetadata, PollingArgs, PollingConsumer, SendMessagesResult, +}; +use iggy_common::sharding::{IggyNamespace, LocalIdx, ShardId}; +use iggy_common::{ConsumerOffsetInfo, IggyError}; +use std::collections::HashMap; + +/// Per-shard collection of all partitions. +/// +/// This struct manages ALL partitions assigned to a single shard, regardless +/// of which stream/topic they belong to. This flat structure enables: +/// - Direct O(1) access via LocalIdx +/// - Efficient iteration over all partitions on a shard +/// - Clean separation from metadata hierarchy (streams/topics) +/// +/// Note: The partition_id within IggyNamespace may NOT equal the Vec index. +/// For example, shard 0 might have partition_ids [0, 2, 4] while shard 1 +/// has partition_ids [1, 3, 5]. The `LocalIdx` provides the actual index +/// into the `partitions` Vec. +pub struct IggyPartitions { + shard_id: ShardId, + partitions: Vec<IggyPartition>, + namespace_to_local: HashMap<IggyNamespace, LocalIdx>, +} + +impl IggyPartitions { + pub fn new(shard_id: ShardId) -> Self { + Self { + shard_id, + partitions: Vec::new(), + namespace_to_local: HashMap::new(), + } + } + + pub fn with_capacity(shard_id: ShardId, capacity: usize) -> Self { + Self { + shard_id, + partitions: Vec::with_capacity(capacity), + namespace_to_local: HashMap::with_capacity(capacity), + } + } + + pub fn shard_id(&self) -> ShardId { + self.shard_id + } + + pub fn len(&self) -> usize { + self.partitions.len() + } + + pub fn is_empty(&self) -> bool { + self.partitions.is_empty() + } + + /// Get partition by local index. + pub fn get(&self, local_idx: LocalIdx) -> Option<&IggyPartition> { + self.partitions.get(*local_idx) + } + + /// Get mutable partition by local index. + pub fn get_mut(&mut self, local_idx: LocalIdx) -> Option<&mut IggyPartition> { + self.partitions.get_mut(*local_idx) + } + + /// Lookup local index by namespace. + pub fn local_idx(&self, namespace: &IggyNamespace) -> Option<LocalIdx> { + self.namespace_to_local.get(namespace).copied() + } + + /// Insert a new partition and return its local index. + pub fn insert(&mut self, namespace: IggyNamespace, partition: IggyPartition) -> LocalIdx { + let local_idx = LocalIdx::new(self.partitions.len()); + self.partitions.push(partition); + self.namespace_to_local.insert(namespace, local_idx); + local_idx + } + + /// Remove a partition by namespace. Returns the removed partition if found. + pub fn remove(&mut self, namespace: &IggyNamespace) -> Option<IggyPartition> { + // TODO(hubcio): consider adding reverse map `LocalIdx → IggyNamespace` for O(1) + // updates, or use a different data structure (e.g., slotmap) if removal is frequent. + + let local_idx = self.namespace_to_local.remove(namespace)?; + let idx = *local_idx; + + if idx >= self.partitions.len() { + return None; + } + + // Swap-remove for O(1) deletion + let partition = self.partitions.swap_remove(idx); + + // If we swapped an element, update its index in the map + if idx < self.partitions.len() { + // Find the namespace that was at the last position (now at idx) + for (_ns, lidx) in self.namespace_to_local.iter_mut() { + if **lidx == self.partitions.len() { + *lidx = LocalIdx::new(idx); + break; + } + } + } + + Some(partition) + } +} + +/// TODO(hubcio): actual logic will be added when integrating with storage. +impl Partitions for IggyPartitions { + type MessageBatch = (); + type MessageBatchSet = (); + + async fn poll_messages( + &self, + _namespace: &IggyNamespace, + _local_idx: LocalIdx, + _consumer: PollingConsumer, + _args: PollingArgs, + ) -> Result<(PollMetadata, Self::MessageBatchSet), IggyError> { + Ok((PollMetadata::new(0, 0), ())) + } + + async fn send_messages( + &self, + _namespace: &IggyNamespace, + _local_idx: LocalIdx, + _batch: Self::MessageBatch, + ) -> Result<SendMessagesResult, IggyError> { + Ok(SendMessagesResult { messages_count: 0 }) + } + + async fn create_partition(&self, _namespace: &IggyNamespace) -> Result<LocalIdx, IggyError> { + Err(IggyError::FeatureUnavailable) + } + + async fn delete_partitions( + &self, + _namespaces: &[IggyNamespace], + ) -> Result<Vec<LocalIdx>, IggyError> { + Err(IggyError::FeatureUnavailable) + } + + async fn get_consumer_offset( + &self, + _namespace: &IggyNamespace, + _local_idx: LocalIdx, + _consumer: PollingConsumer, + ) -> Result<Option<ConsumerOffsetInfo>, IggyError> { + Ok(None) + } + + async fn store_consumer_offset( + &self, + _namespace: &IggyNamespace, + _local_idx: LocalIdx, + _consumer: PollingConsumer, + _offset: u64, + ) -> Result<(), IggyError> { + Ok(()) + } + + async fn delete_consumer_offset( + &self, + _namespace: &IggyNamespace, + _local_idx: LocalIdx, + _consumer: PollingConsumer, + ) -> Result<(), IggyError> { + Ok(()) + } + + async fn flush_unsaved_buffer( + &self, + _namespace: &IggyNamespace, + _local_idx: LocalIdx, + _fsync: bool, + ) -> Result<(), IggyError> { + Ok(()) + } +} diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs new file mode 100644 index 000000000..2c1c13078 --- /dev/null +++ b/core/partitions/src/lib.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod iggy_partition; +mod iggy_partitions; +mod traits; + +pub use iggy_partition::IggyPartition; +pub use iggy_partitions::IggyPartitions; +pub use traits::{Partitions, PollMetadata, PollingArgs, PollingConsumer, SendMessagesResult}; diff --git a/core/partitions/src/traits.rs b/core/partitions/src/traits.rs new file mode 100644 index 000000000..140fd39fc --- /dev/null +++ b/core/partitions/src/traits.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy_common::sharding::{IggyNamespace, LocalIdx}; +use iggy_common::{ConsumerOffsetInfo, IggyError, PollingStrategy}; +use std::future::Future; + +/// Arguments for polling messages from a partition. +#[derive(Debug, Clone)] +pub struct PollingArgs { + pub strategy: PollingStrategy, + pub count: u32, + pub auto_commit: bool, +} + +impl PollingArgs { + pub fn new(strategy: PollingStrategy, count: u32, auto_commit: bool) -> Self { + Self { + strategy, + count, + auto_commit, + } + } +} + +/// Metadata returned from a poll operation. +#[derive(Debug, Clone)] +pub struct PollMetadata { + pub partition_id: u32, + pub current_offset: u64, +} + +impl PollMetadata { + pub fn new(partition_id: u32, current_offset: u64) -> Self { + Self { + partition_id, + current_offset, + } + } +} + +/// Result of sending messages. +#[derive(Debug)] +pub struct SendMessagesResult { + pub messages_count: u32, +} + +/// Consumer identification for offset operations. +// TODO(hubcio): unify with server's `PollingConsumer` in `streaming/polling_consumer.rs`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PollingConsumer { + /// Regular consumer with (consumer_id, partition_id) + Consumer(usize, usize), + /// Consumer group with (group_id, member_id) + ConsumerGroup(usize, usize), +} + +/// The core abstraction for partition operations in clustering. +/// +/// This trait defines the data-plane operations for partitions that +/// need to be coordinated across a cluster using viewstamped replication. +/// Implementations can vary between single-node and clustered deployments. +pub trait Partitions { + /// Message batch type for sending messages. + type MessageBatch; + /// Message batch set type for poll results. + type MessageBatchSet; + + /// Poll messages from a partition. + fn poll_messages( + &self, + namespace: &IggyNamespace, + local_idx: LocalIdx, + consumer: PollingConsumer, + args: PollingArgs, + ) -> impl Future<Output = Result<(PollMetadata, Self::MessageBatchSet), IggyError>> + Send; + + /// Send/append messages to a partition. + fn send_messages( + &self, + namespace: &IggyNamespace, + local_idx: LocalIdx, + batch: Self::MessageBatch, + ) -> impl Future<Output = Result<SendMessagesResult, IggyError>> + Send; + + /// Create a new partition. + fn create_partition( + &self, + namespace: &IggyNamespace, + ) -> impl Future<Output = Result<LocalIdx, IggyError>> + Send; + + /// Delete partitions from the collection. + fn delete_partitions( + &self, + namespaces: &[IggyNamespace], + ) -> impl Future<Output = Result<Vec<LocalIdx>, IggyError>> + Send; + + /// Get the stored offset for a consumer on a partition. + fn get_consumer_offset( + &self, + namespace: &IggyNamespace, + local_idx: LocalIdx, + consumer: PollingConsumer, + ) -> impl Future<Output = Result<Option<ConsumerOffsetInfo>, IggyError>> + Send; + + /// Store/update the offset for a consumer on a partition. + fn store_consumer_offset( + &self, + namespace: &IggyNamespace, + local_idx: LocalIdx, + consumer: PollingConsumer, + offset: u64, + ) -> impl Future<Output = Result<(), IggyError>> + Send; + + /// Delete the stored offset for a consumer on a partition. + fn delete_consumer_offset( + &self, + namespace: &IggyNamespace, + local_idx: LocalIdx, + consumer: PollingConsumer, + ) -> impl Future<Output = Result<(), IggyError>> + Send; + + /// Flush unsaved messages to disk. + fn flush_unsaved_buffer( + &self, + namespace: &IggyNamespace, + local_idx: LocalIdx, + fsync: bool, + ) -> impl Future<Output = Result<(), IggyError>> + Send; +} diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 23e7c515d..869f0b9a7 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -23,6 +23,7 @@ use dashmap::DashMap; use dotenvy::dotenv; use err_trail::ErrContext; use figlet_rs::FIGfont; +use iggy_common::sharding::{LocalIdx, ShardLocation}; use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError, MemoryPool}; use server::SEMANTIC_VERSION; use server::args::Args; @@ -314,7 +315,7 @@ fn main() -> Result<(), ServerError> { // Shared resources bootstrap. let shards_table = Box::new(DashMap::with_capacity(SHARDS_TABLE_CAPACITY)); let shards_table = Box::leak(shards_table); - let shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>> = shards_table.into(); + let shards_table: EternalPtr<DashMap<IggyNamespace, ShardLocation>> = shards_table.into(); let client_manager = Box::new(DashMap::new()); let client_manager = Box::leak(client_manager); @@ -338,7 +339,9 @@ fn main() -> Result<(), ServerError> { &ns, shard_assignment.len() as u32, )); - shards_table.insert(ns, shard_id); + // LocalIdx is a placeholder until IggyPartitions integration + let location = ShardLocation::new(shard_id, LocalIdx::new(0)); + shards_table.insert(ns, location); } }); } diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 188452fe8..62032b32f 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -18,7 +18,7 @@ use super::{ IggyShard, TaskRegistry, transmission::connector::ShardConnector, - transmission::frame::ShardFrame, transmission::id::ShardId, + transmission::frame::ShardFrame, }; use crate::{ configs::server::ServerConfig, @@ -33,13 +33,14 @@ use crate::{ }; use dashmap::DashMap; use iggy_common::EncryptorKind; +use iggy_common::sharding::ShardLocation; use std::{cell::Cell, rc::Rc, sync::atomic::AtomicBool}; #[derive(Default)] pub struct IggyShardBuilder { id: Option<u16>, streams: Option<Streams>, - shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>, + shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardLocation>>>, state: Option<FileState>, users: Option<Users>, client_manager: Option<ClientManager>, @@ -69,7 +70,7 @@ impl IggyShardBuilder { pub fn shards_table( mut self, - shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>, + shards_table: EternalPtr<DashMap<IggyNamespace, ShardLocation>>, ) -> Self { self.shards_table = Some(shards_table); self diff --git a/core/server/src/shard/communication.rs b/core/server/src/shard/communication.rs index 59dc11183..e3c780c8f 100644 --- a/core/server/src/shard/communication.rs +++ b/core/server/src/shard/communication.rs @@ -22,13 +22,13 @@ use crate::shard::{ connector::ShardConnector, event::ShardEvent, frame::ShardFrame, - id::ShardId, message::{ShardMessage, ShardSendRequestResult}, }, }; use futures::future::join_all; use hash32::{Hasher, Murmur3Hasher}; use iggy_common::IggyError; +use iggy_common::sharding::ShardLocation; use std::hash::Hasher as _; use tracing::{error, info, warn}; @@ -138,48 +138,48 @@ impl IggyShard { } pub fn find_shard(&self, namespace: &IggyNamespace) -> Option<&ShardConnector<ShardFrame>> { - self.shards_table.get(namespace).map(|shard_id| { + self.shards_table.get(namespace).map(|location| { self.shards .iter() - .find(|shard| shard.id == shard_id.id()) + .find(|shard| shard.id == *location.shard_id) .expect("Shard not found in the shards table.") }) } - pub fn find_shard_table_record(&self, namespace: &IggyNamespace) -> Option<ShardId> { + pub fn find_shard_table_record(&self, namespace: &IggyNamespace) -> Option<ShardLocation> { self.shards_table.get(namespace).map(|entry| *entry) } - pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> ShardId { + pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> ShardLocation { self.shards_table .remove(namespace) - .map(|(_, shard_id)| shard_id) + .map(|(_, location)| location) .expect("remove_shard_table_record: namespace not found") } pub fn remove_shard_table_records( &self, namespaces: &[IggyNamespace], - ) -> Vec<(IggyNamespace, ShardId)> { + ) -> Vec<(IggyNamespace, ShardLocation)> { namespaces .iter() .map(|ns| { - let (ns, shard_id) = self.shards_table.remove(ns).unwrap(); - (ns, shard_id) + let (ns, location) = self.shards_table.remove(ns).unwrap(); + (ns, location) }) .collect() } - pub fn insert_shard_table_record(&self, ns: IggyNamespace, shard_id: ShardId) { - self.shards_table.insert(ns, shard_id); + pub fn insert_shard_table_record(&self, ns: IggyNamespace, location: ShardLocation) { + self.shards_table.insert(ns, location); } pub fn get_current_shard_namespaces(&self) -> Vec<IggyNamespace> { self.shards_table .iter() .filter_map(|entry| { - let (ns, shard_id) = entry.pair(); - if shard_id.id() == self.id { + let (ns, location) = entry.pair(); + if *location.shard_id == self.id { Some(*ns) } else { None diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index f48279362..178fa6308 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -46,6 +46,7 @@ use crate::{ }; use builder::IggyShardBuilder; use dashmap::DashMap; +use iggy_common::sharding::ShardLocation; use iggy_common::{EncryptorKind, Identifier, IggyError}; use std::{ cell::{Cell, RefCell}, @@ -55,10 +56,7 @@ use std::{ time::{Duration, Instant}, }; use tracing::{debug, error, info, instrument}; -use transmission::{ - connector::{Receiver, ShardConnector, StopReceiver}, - id::ShardId, -}; +use transmission::connector::{Receiver, ShardConnector, StopReceiver}; pub const COMPONENT: &str = "SHARD"; pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); @@ -70,7 +68,7 @@ pub struct IggyShard { _version: SemanticVersion, pub(crate) streams: Streams, - pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>, + pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardLocation>>, pub(crate) state: FileState, pub(crate) fs_locks: FsLocks, @@ -190,9 +188,9 @@ impl IggyShard { async fn load_segments(&self) -> Result<(), IggyError> { use crate::bootstrap::load_segments; for shard_entry in self.shards_table.iter() { - let (namespace, shard_id) = shard_entry.pair(); + let (namespace, location) = shard_entry.pair(); - if **shard_id == self.id { + if *location.shard_id == self.id { let stream_id = namespace.stream_id(); let topic_id: usize = namespace.topic_id(); let partition_id = namespace.partition_id(); diff --git a/core/server/src/shard/namespace.rs b/core/server/src/shard/namespace.rs index 44162e013..5b9dea4aa 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/server/src/shard/namespace.rs @@ -16,44 +16,15 @@ * under the License. */ +// Re-export from common for backward compatibility +pub use iggy_common::sharding::{ + IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, PARTITION_BITS, PARTITION_MASK, + PARTITION_SHIFT, STREAM_BITS, STREAM_MASK, STREAM_SHIFT, TOPIC_BITS, TOPIC_MASK, TOPIC_SHIFT, +}; + use crate::slab::partitions; use iggy_common::Identifier; -// Packed namespace layout (works only on 64bit platforms, but we won't support 32bit anyway) -// +----------------+----------------+----------------+----------------+ -// | stream_id | topic_id | partition_id | unused | -// | STREAM_BITS | TOPIC_BITS | PARTITION_BITS | (64 - total) | -// +----------------+----------------+----------------+----------------+ - -// TODO Use consts from the `slab` module. -pub const MAX_STREAMS: usize = 4096; -pub const MAX_TOPICS: usize = 4096; -pub const MAX_PARTITIONS: usize = 1_000_000; - -const fn bits_required(mut n: u64) -> u32 { - if n == 0 { - return 1; - } - let mut b = 0; - while n > 0 { - b += 1; - n >>= 1; - } - b -} - -pub const STREAM_BITS: u32 = bits_required((MAX_STREAMS - 1) as u64); -pub const TOPIC_BITS: u32 = bits_required((MAX_TOPICS - 1) as u64); -pub const PARTITION_BITS: u32 = bits_required((MAX_PARTITIONS - 1) as u64); - -pub const PARTITION_SHIFT: u32 = 0; -pub const TOPIC_SHIFT: u32 = PARTITION_SHIFT + PARTITION_BITS; -pub const STREAM_SHIFT: u32 = TOPIC_SHIFT + TOPIC_BITS; - -pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1; -pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1; -pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1; - #[derive(Debug)] pub struct IggyFullNamespace { stream: Identifier, @@ -86,36 +57,3 @@ impl IggyFullNamespace { (self.stream, self.topic, self.partition) } } - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct IggyNamespace(u64); - -impl IggyNamespace { - #[inline] - pub fn inner(&self) -> u64 { - self.0 - } - - #[inline] - pub fn stream_id(&self) -> usize { - ((self.0 >> STREAM_SHIFT) & STREAM_MASK) as usize - } - - #[inline] - pub fn topic_id(&self) -> usize { - ((self.0 >> TOPIC_SHIFT) & TOPIC_MASK) as usize - } - - #[inline] - pub fn partition_id(&self) -> usize { - ((self.0 >> PARTITION_SHIFT) & PARTITION_MASK) as usize - } - - #[inline] - pub fn new(stream: usize, topic: usize, partition: usize) -> Self { - let value = ((stream as u64) & STREAM_MASK) << STREAM_SHIFT - | ((topic as u64) & TOPIC_MASK) << TOPIC_SHIFT - | ((partition as u64) & PARTITION_MASK) << PARTITION_SHIFT; - Self(value) - } -} diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index b95847303..7e3608eb9 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -35,6 +35,7 @@ use crate::streaming::topics; use err_trail::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; +use iggy_common::sharding::{LocalIdx, ShardLocation}; use tracing::info; impl IggyShard { @@ -105,7 +106,10 @@ impl IggyShard { let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, partition_id); let shard_id = ShardId::new(calculate_shard_assignment(&ns, shards_count)); let is_current_shard = self.id == *shard_id; - self.insert_shard_table_record(ns, shard_id); + // TODO(hubcio): LocalIdx(0) is wrong.. When IggyPartitions is integrated into + // IggyShard, this should use the actual index returned by IggyPartitions::insert(). + let location = ShardLocation::new(shard_id, LocalIdx::new(0)); + self.insert_shard_table_record(ns, location); create_partition_file_hierarchy( numeric_stream_id as usize, @@ -204,11 +208,14 @@ impl IggyShard { "create_partitions_bypass_auth: partition mismatch ID, wrong creation order ?!" ); let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, id); - let shard_id = self.find_shard_table_record(&ns).unwrap_or_else(|| { + // TODO(hubcio): when IggyPartitions is integrated, this fallback path should + // either be removed or use proper index resolution. + let location = self.find_shard_table_record(&ns).unwrap_or_else(|| { tracing::warn!("WARNING: missing shard table record for namespace: {:?}, in the event handler for `CreatedPartitions` event.", ns); - ShardId::new(calculate_shard_assignment(&ns, shards_count)) + let shard_id = ShardId::new(calculate_shard_assignment(&ns, shards_count)); + ShardLocation::new(shard_id, LocalIdx::new(0)) }); - if self.id == *shard_id { + if self.id == *location.shard_id { self.init_log(stream_id, topic_id, id).await?; } } diff --git a/core/server/src/shard/transmission/id.rs b/core/server/src/shard/transmission/id.rs index ad16ff9ad..86d4443db 100644 --- a/core/server/src/shard/transmission/id.rs +++ b/core/server/src/shard/transmission/id.rs @@ -1,5 +1,4 @@ /* Licensed to the Apache Software Foundation (ASF) under one - * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -17,28 +16,5 @@ * under the License. */ -use std::ops::Deref; - -// TODO: Maybe pad to cache line size? -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub struct ShardId { - id: u16, -} - -impl ShardId { - pub fn new(id: u16) -> Self { - Self { id } - } - - pub fn id(&self) -> u16 { - self.id - } -} - -impl Deref for ShardId { - type Target = u16; - - fn deref(&self) -> &Self::Target { - &self.id - } -} +// Re-export from common for backward compatibility +pub use iggy_common::sharding::ShardId;
