This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d765fc441c84c4de40e6a9578fd2f510d63dd965 Author: numinex <[email protected]> AuthorDate: Sun May 11 17:17:51 2025 +0200 initial setup and migration of shards --- Cargo.lock | 91 +++++++++++++++- core/server/Cargo.toml | 2 + core/server/src/bootstrap.rs | 0 core/server/src/lib.rs | 2 + core/server/src/shard/connector.rs | 140 +++++++++++++++++++++++++ core/server/src/shard/frame.rs | 68 ++++++++++++ core/server/src/shard/mod.rs | 59 +++++++++++ core/server/src/{lib.rs => shard/namespace.rs} | 48 +++------ 8 files changed, 375 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0406b044..d9ba5d6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,7 +142,7 @@ dependencies = [ "actix-utils", "futures-core", "futures-util", - "mio", + "mio 1.0.3", "socket2", "tokio", "tracing", @@ -631,6 +631,17 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "auto-const-array" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -2646,6 +2657,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generator" version = "0.8.5" @@ -4088,6 +4108,16 @@ dependencies = [ "rustversion", ] +[[package]] +name = "io-uring" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -4703,6 +4733,37 @@ dependencies = [ "uuid", ] +[[package]] +name = "monoio" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a" +dependencies = [ + "auto-const-array", + "bytes", + "fxhash", + "io-uring", + "libc", + "memchr", + "mio 0.8.11", + "monoio-macros", + "nix 0.26.4", + "pin-project-lite", + "socket2", + "windows-sys 0.48.0", +] + +[[package]] +name = "monoio-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "nanorand" version = "0.7.0" @@ -4718,6 +4779,19 @@ version = "6.6.666" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6" +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", + "memoffset", + "pin-utils", +] + [[package]] name = "nix" version = "0.30.1" @@ -4782,7 +4856,7 @@ dependencies = [ "kqueue", "libc", "log", - "mio", + "mio 1.0.3", "notify-types", "walkdir", "windows-sys 0.59.0", @@ -6738,7 +6812,8 @@ dependencies = [ "mimalloc", "mockall", "moka", - "nix", + "monoio", + "nix 0.30.1", "once_cell", "opentelemetry", "opentelemetry-appender-tracing", @@ -6756,6 +6831,7 @@ dependencies = [ "serde", "serde_with", "serial_test", + "sharded_queue", "static-toml", "strum", "sysinfo 0.35.2", @@ -6807,6 +6883,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "sharded_queue" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3499be28bd82560e75ad10457698607ba3cc389175ab47ac93279834ce1fab4" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "shlex" version = "1.3.0" diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 34ab57fb..6a526df6 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -63,6 +63,7 @@ jsonwebtoken = "9.3.1" lending-iterator = "0.1.7" mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } +monoio = "0.2.4" nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" opentelemetry = { version = "0.30.0", features = ["trace", "logs"] } @@ -97,6 +98,7 @@ rustls-pemfile = "2.2.0" serde = { workspace = true } serde_with = { workspace = true } static-toml = "1.3.0" +sharded_queue = "2.0.1" strum = { workspace = true } sysinfo = { workspace = true } tempfile = { workspace = true } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs new file mode 100644 index 00000000..e69de29b diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 91d17586..572301e0 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -26,12 +26,14 @@ static GLOBAL: MiMalloc = MiMalloc; #[cfg(windows)] compile_error!("iggy-server doesn't support windows."); +mod bootstrap; pub mod archiver; pub mod args; pub mod binary; pub mod channels; pub(crate) mod compat; pub mod configs; +pub mod shard; pub mod http; pub mod log; pub mod quic; diff --git a/core/server/src/shard/connector.rs b/core/server/src/shard/connector.rs new file mode 100644 index 00000000..349218ba --- /dev/null +++ b/core/server/src/shard/connector.rs @@ -0,0 +1,140 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use futures::{task::AtomicWaker, Stream}; +use sharded_queue::ShardedQueue; +use std::{ + sync::{atomic::AtomicUsize, Arc}, + task::Poll, +}; + +pub type StopSender = flume::Sender<()>; +pub type StopReceiver = flume::Receiver<()>; + +#[derive(Clone)] +pub struct ShardConnector<T> { + pub id: u16, + pub sender: Sender<T>, + pub receiver: Receiver<T>, + pub stop_receiver: StopReceiver, + pub stop_sender: StopSender, +} + +// TODO(numinex) - replace flume with async_channel +impl<T: Clone> ShardConnector<T> { + pub fn new(id: u16, max_concurrent_thread_count: usize) -> Self { + let channel = Arc::new(ShardedChannel::new(max_concurrent_thread_count)); + let (sender, receiver) = channel.unbounded(); + let (stop_sender, stop_receiver) = flume::bounded(1); + Self { + id, + receiver, + sender, + stop_receiver, + stop_sender, + } + } + + pub fn send(&self, data: T) { + self.sender.send(data); + } +} + +#[derive(Clone)] +pub struct Receiver<T> { + channel: Arc<ShardedChannel<T>>, +} + +#[derive(Clone)] +pub struct Sender<T> { + channel: Arc<ShardedChannel<T>>, +} + +impl<T> Sender<T> { + pub fn send(&self, data: T) { + self.channel + .task_queue + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.channel.queue.push_back(data); + self.channel.waker.wake(); + } +} + +pub struct ShardedChannel<T> { + queue: ShardedQueue<T>, + task_queue: AtomicUsize, + waker: AtomicWaker, +} + +impl<T> ShardedChannel<T> { + pub fn new(max_concurrent_thread_count: usize) -> Self { + let waker = AtomicWaker::new(); + + Self { + queue: ShardedQueue::new(max_concurrent_thread_count), + task_queue: AtomicUsize::new(0), + waker, + } + } +} + +pub trait ShardedChannelsSplit<T: Clone> { + fn unbounded(&self) -> (Sender<T>, Receiver<T>); + + fn sender(&self) -> Sender<T>; +} + +impl<T: Clone> ShardedChannelsSplit<T> for Arc<ShardedChannel<T>> { + fn unbounded(&self) -> (Sender<T>, Receiver<T>) { + let tx = self.sender(); + let rx = Receiver { + channel: Arc::clone(self), + }; + + (tx, rx) + } + + fn sender(&self) -> Sender<T> { + Sender { + channel: Arc::clone(self), + } + } +} + +impl<T> Stream for Receiver<T> { + type Item = T; + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Self::Item>> { + let old = self + .channel + .task_queue + .load(std::sync::atomic::Ordering::Relaxed); + if old == 0 { + self.channel.waker.register(cx.waker()); + return Poll::Pending; + } + + assert!(old > 0); + self.channel + .task_queue + .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + let item = self.channel.queue.pop_front_or_spin_wait_item(); + Poll::Ready(Some(item)) + } +} diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs new file mode 100644 index 00000000..b1524ad6 --- /dev/null +++ b/core/server/src/shard/frame.rs @@ -0,0 +1,68 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use bytes::Bytes; +use flume::Sender; +use iggy_common::IggyError; + +#[derive(Debug, Clone)] +pub enum ShardMessage { + //TODO: Fixme + //Command(ServerCommand), + Event(ShardEvent), +} + +#[derive(Debug, Clone)] +pub enum ShardEvent { +} + +#[derive(Debug)] +pub enum ShardResponse { + BinaryResponse(Bytes), + ErrorResponse(IggyError), +} + +#[derive(Debug, Clone)] +pub struct ShardFrame { + pub client_id: u32, + pub message: ShardMessage, + pub response_sender: Option<Sender<ShardResponse>>, +} + +impl ShardFrame { + pub fn new( + client_id: u32, + message: ShardMessage, + response_sender: Option<Sender<ShardResponse>>, + ) -> Self { + Self { + client_id, + message, + response_sender, + } + } +} + +#[macro_export] +macro_rules! handle_response { + ($sender:expr, $response:expr) => { + match $response { + ShardResponse::BinaryResponse(payload) => $sender.send_ok_response(&payload).await?, + ShardResponse::ErrorResponse(err) => $sender.send_error_response(err).await?, + } + }; +} diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs new file mode 100644 index 00000000..6ccbcab3 --- /dev/null +++ b/core/server/src/shard/mod.rs @@ -0,0 +1,59 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +mod connector; +mod namespace; +mod frame; + + + +use std::cell::RefCell; +use ahash::HashMap; +use connector::ShardConnector; +use frame::ShardFrame; +use namespace::IggyNamespace; +struct Shard { + id: u16, + connection: ShardConnector<ShardFrame>, +} + +struct ShardInfo { + id: u16, +} + +pub struct IggyShard { + pub id: u16, + shards: Vec<Shard>, + shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>, + + pub(crate) permissioner: RefCell<Permissioner>, + pub(crate) storage: Rc<SystemStorage>, + pub(crate) streams: RwLock<HashMap<u32, Stream>>, + pub(crate) streams_ids: RefCell<HashMap<String, u32>>, + pub(crate) users: RefCell<HashMap<UserId, User>>, + + // TODO - get rid of this dynamic dispatch. + pub(crate) state: Rc<FileState>, + pub(crate) encryptor: Option<Rc<dyn Encryptor>>, + pub(crate) config: ServerConfig, + pub(crate) client_manager: RefCell<ClientManager>, + pub(crate) active_sessions: RefCell<Vec<Session>>, + pub(crate) metrics: Metrics, + pub message_receiver: Cell<Option<Receiver<ShardFrame>>>, + stop_receiver: StopReceiver, + stop_sender: StopSender, +} \ No newline at end of file diff --git a/core/server/src/lib.rs b/core/server/src/shard/namespace.rs similarity index 51% copy from core/server/src/lib.rs copy to core/server/src/shard/namespace.rs index 91d17586..4b972125 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/shard/namespace.rs @@ -15,39 +15,23 @@ * specific language governing permissions and limitations * under the License. */ +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct IggyNamespace { + pub(crate) stream_id: u32, + pub(crate) topic_id: u32, + pub(crate) partition_id: u32, +} -#[cfg(not(feature = "disable-mimalloc"))] -use mimalloc::MiMalloc; - -#[cfg(not(feature = "disable-mimalloc"))] -#[global_allocator] -static GLOBAL: MiMalloc = MiMalloc; - -#[cfg(windows)] -compile_error!("iggy-server doesn't support windows."); - -pub mod archiver; -pub mod args; -pub mod binary; -pub mod channels; -pub(crate) mod compat; -pub mod configs; -pub mod http; -pub mod log; -pub mod quic; -pub mod server_error; -pub mod state; -pub mod streaming; -pub mod tcp; -pub mod versioning; - -const VERSION: &str = env!("CARGO_PKG_VERSION"); -const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME"; -const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD"; +impl IggyNamespace { + pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self { + Self { + stream_id, + topic_id, + partition_id, + } + } -pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str { - match enabled { - true => "enabled", - false => "disabled", + pub fn generate_hash(&self) -> u32 { + todo!(); } }
