This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d765fc441c84c4de40e6a9578fd2f510d63dd965
Author: numinex <[email protected]>
AuthorDate: Sun May 11 17:17:51 2025 +0200

    initial setup and migration of shards
---
 Cargo.lock                                     |  91 +++++++++++++++-
 core/server/Cargo.toml                         |   2 +
 core/server/src/bootstrap.rs                   |   0
 core/server/src/lib.rs                         |   2 +
 core/server/src/shard/connector.rs             | 140 +++++++++++++++++++++++++
 core/server/src/shard/frame.rs                 |  68 ++++++++++++
 core/server/src/shard/mod.rs                   |  59 +++++++++++
 core/server/src/{lib.rs => shard/namespace.rs} |  48 +++------
 8 files changed, 375 insertions(+), 35 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0406b044..d9ba5d6a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -142,7 +142,7 @@ dependencies = [
  "actix-utils",
  "futures-core",
  "futures-util",
- "mio",
+ "mio 1.0.3",
  "socket2",
  "tokio",
  "tracing",
@@ -631,6 +631,17 @@ dependencies = [
  "webpki-roots 0.26.11",
 ]
 
+[[package]]
+name = "auto-const-array"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "autocfg"
 version = "1.5.0"
@@ -2646,6 +2657,15 @@ dependencies = [
  "slab",
 ]
 
+[[package]]
+name = "fxhash"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
+dependencies = [
+ "byteorder",
+]
+
 [[package]]
 name = "generator"
 version = "0.8.5"
@@ -4088,6 +4108,16 @@ dependencies = [
  "rustversion",
 ]
 
+[[package]]
+name = "io-uring"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
+dependencies = [
+ "bitflags 1.3.2",
+ "libc",
+]
+
 [[package]]
 name = "ipnet"
 version = "2.11.0"
@@ -4703,6 +4733,37 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "monoio"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a"
+dependencies = [
+ "auto-const-array",
+ "bytes",
+ "fxhash",
+ "io-uring",
+ "libc",
+ "memchr",
+ "mio 0.8.11",
+ "monoio-macros",
+ "nix 0.26.4",
+ "pin-project-lite",
+ "socket2",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
+name = "monoio-macros"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "nanorand"
 version = "0.7.0"
@@ -4718,6 +4779,19 @@ version = "6.6.666"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6"
 
+[[package]]
+name = "nix"
+version = "0.26.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
+dependencies = [
+ "bitflags 1.3.2",
+ "cfg-if",
+ "libc",
+ "memoffset",
+ "pin-utils",
+]
+
 [[package]]
 name = "nix"
 version = "0.30.1"
@@ -4782,7 +4856,7 @@ dependencies = [
  "kqueue",
  "libc",
  "log",
- "mio",
+ "mio 1.0.3",
  "notify-types",
  "walkdir",
  "windows-sys 0.59.0",
@@ -6738,7 +6812,8 @@ dependencies = [
  "mimalloc",
  "mockall",
  "moka",
- "nix",
+ "monoio",
+ "nix 0.30.1",
  "once_cell",
  "opentelemetry",
  "opentelemetry-appender-tracing",
@@ -6756,6 +6831,7 @@ dependencies = [
  "serde",
  "serde_with",
  "serial_test",
+ "sharded_queue",
  "static-toml",
  "strum",
  "sysinfo 0.35.2",
@@ -6807,6 +6883,15 @@ dependencies = [
  "lazy_static",
 ]
 
+[[package]]
+name = "sharded_queue"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a3499be28bd82560e75ad10457698607ba3cc389175ab47ac93279834ce1fab4"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "shlex"
 version = "1.3.0"
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 34ab57fb..6a526df6 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -63,6 +63,7 @@ jsonwebtoken = "9.3.1"
 lending-iterator = "0.1.7"
 mimalloc = { workspace = true, optional = true }
 moka = { version = "0.12.10", features = ["future"] }
+monoio = "0.2.4"
 nix = { version = "0.30", features = ["fs"] }
 once_cell = "1.21.3"
 opentelemetry = { version = "0.30.0", features = ["trace", "logs"] }
@@ -97,6 +98,7 @@ rustls-pemfile = "2.2.0"
 serde = { workspace = true }
 serde_with = { workspace = true }
 static-toml = "1.3.0"
+sharded_queue = "2.0.1"
 strum = { workspace = true }
 sysinfo = { workspace = true }
 tempfile = { workspace = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
new file mode 100644
index 00000000..e69de29b
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 91d17586..572301e0 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -26,12 +26,14 @@ static GLOBAL: MiMalloc = MiMalloc;
 #[cfg(windows)]
 compile_error!("iggy-server doesn't support windows.");
 
+mod bootstrap;
 pub mod archiver;
 pub mod args;
 pub mod binary;
 pub mod channels;
 pub(crate) mod compat;
 pub mod configs;
+pub mod shard;
 pub mod http;
 pub mod log;
 pub mod quic;
diff --git a/core/server/src/shard/connector.rs 
b/core/server/src/shard/connector.rs
new file mode 100644
index 00000000..349218ba
--- /dev/null
+++ b/core/server/src/shard/connector.rs
@@ -0,0 +1,140 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use futures::{task::AtomicWaker, Stream};
+use sharded_queue::ShardedQueue;
+use std::{
+    sync::{atomic::AtomicUsize, Arc},
+    task::Poll,
+};
+
+pub type StopSender = flume::Sender<()>;
+pub type StopReceiver = flume::Receiver<()>;
+
+#[derive(Clone)]
+pub struct ShardConnector<T> {
+    pub id: u16,
+    pub sender: Sender<T>,
+    pub receiver: Receiver<T>,
+    pub stop_receiver: StopReceiver,
+    pub stop_sender: StopSender,
+}
+
+// TODO(numinex) - replace flume with async_channel
+impl<T: Clone> ShardConnector<T> {
+    pub fn new(id: u16, max_concurrent_thread_count: usize) -> Self {
+        let channel = 
Arc::new(ShardedChannel::new(max_concurrent_thread_count));
+        let (sender, receiver) = channel.unbounded();
+        let (stop_sender, stop_receiver) = flume::bounded(1);
+        Self {
+            id,
+            receiver,
+            sender,
+            stop_receiver,
+            stop_sender,
+        }
+    }
+
+    pub fn send(&self, data: T) {
+        self.sender.send(data);
+    }
+}
+
+#[derive(Clone)]
+pub struct Receiver<T> {
+    channel: Arc<ShardedChannel<T>>,
+}
+
+#[derive(Clone)]
+pub struct Sender<T> {
+    channel: Arc<ShardedChannel<T>>,
+}
+
+impl<T> Sender<T> {
+    pub fn send(&self, data: T) {
+        self.channel
+            .task_queue
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+        self.channel.queue.push_back(data);
+        self.channel.waker.wake();
+    }
+}
+
+pub struct ShardedChannel<T> {
+    queue: ShardedQueue<T>,
+    task_queue: AtomicUsize,
+    waker: AtomicWaker,
+}
+
+impl<T> ShardedChannel<T> {
+    pub fn new(max_concurrent_thread_count: usize) -> Self {
+        let waker = AtomicWaker::new();
+
+        Self {
+            queue: ShardedQueue::new(max_concurrent_thread_count),
+            task_queue: AtomicUsize::new(0),
+            waker,
+        }
+    }
+}
+
+pub trait ShardedChannelsSplit<T: Clone> {
+    fn unbounded(&self) -> (Sender<T>, Receiver<T>);
+
+    fn sender(&self) -> Sender<T>;
+}
+
+impl<T: Clone> ShardedChannelsSplit<T> for Arc<ShardedChannel<T>> {
+    fn unbounded(&self) -> (Sender<T>, Receiver<T>) {
+        let tx = self.sender();
+        let rx = Receiver {
+            channel: Arc::clone(self),
+        };
+
+        (tx, rx)
+    }
+
+    fn sender(&self) -> Sender<T> {
+        Sender {
+            channel: Arc::clone(self),
+        }
+    }
+}
+
+impl<T> Stream for Receiver<T> {
+    type Item = T;
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let old = self
+            .channel
+            .task_queue
+            .load(std::sync::atomic::Ordering::Relaxed);
+        if old == 0 {
+            self.channel.waker.register(cx.waker());
+            return Poll::Pending;
+        }
+
+        assert!(old > 0);
+        self.channel
+            .task_queue
+            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+        let item = self.channel.queue.pop_front_or_spin_wait_item();
+        Poll::Ready(Some(item))
+    }
+}
diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs
new file mode 100644
index 00000000..b1524ad6
--- /dev/null
+++ b/core/server/src/shard/frame.rs
@@ -0,0 +1,68 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use bytes::Bytes;
+use flume::Sender;
+use iggy_common::IggyError;
+
+#[derive(Debug, Clone)]
+pub enum ShardMessage {
+    //TODO: Fixme
+    //Command(ServerCommand),
+    Event(ShardEvent),
+}
+
+#[derive(Debug, Clone)]
+pub enum ShardEvent {
+}
+
+#[derive(Debug)]
+pub enum ShardResponse {
+    BinaryResponse(Bytes),
+    ErrorResponse(IggyError),
+}
+
+#[derive(Debug, Clone)]
+pub struct ShardFrame {
+    pub client_id: u32,
+    pub message: ShardMessage,
+    pub response_sender: Option<Sender<ShardResponse>>,
+}
+
+impl ShardFrame {
+    pub fn new(
+        client_id: u32,
+        message: ShardMessage,
+        response_sender: Option<Sender<ShardResponse>>,
+    ) -> Self {
+        Self {
+            client_id,
+            message,
+            response_sender,
+        }
+    }
+}
+
+#[macro_export]
+macro_rules! handle_response {
+    ($sender:expr, $response:expr) => {
+        match $response {
+            ShardResponse::BinaryResponse(payload) => 
$sender.send_ok_response(&payload).await?,
+            ShardResponse::ErrorResponse(err) => 
$sender.send_error_response(err).await?,
+        }
+    };
+}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
new file mode 100644
index 00000000..6ccbcab3
--- /dev/null
+++ b/core/server/src/shard/mod.rs
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+mod connector;
+mod namespace;
+mod frame;
+
+
+
+use std::cell::RefCell;
+use ahash::HashMap;
+use connector::ShardConnector;
+use frame::ShardFrame;
+use namespace::IggyNamespace;
+struct Shard {
+    id: u16,
+    connection: ShardConnector<ShardFrame>,
+}
+
+struct ShardInfo {
+    id: u16,
+}
+
+pub struct IggyShard {
+    pub id: u16,
+    shards: Vec<Shard>,
+    shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
+
+    pub(crate) permissioner: RefCell<Permissioner>,
+    pub(crate) storage: Rc<SystemStorage>,
+    pub(crate) streams: RwLock<HashMap<u32, Stream>>,
+    pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
+    pub(crate) users: RefCell<HashMap<UserId, User>>,
+
+    // TODO - get rid of this dynamic dispatch.
+    pub(crate) state: Rc<FileState>,
+    pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
+    pub(crate) config: ServerConfig,
+    pub(crate) client_manager: RefCell<ClientManager>,
+    pub(crate) active_sessions: RefCell<Vec<Session>>,
+    pub(crate) metrics: Metrics,
+    pub message_receiver: Cell<Option<Receiver<ShardFrame>>>,
+    stop_receiver: StopReceiver,
+    stop_sender: StopSender,
+}
\ No newline at end of file
diff --git a/core/server/src/lib.rs b/core/server/src/shard/namespace.rs
similarity index 51%
copy from core/server/src/lib.rs
copy to core/server/src/shard/namespace.rs
index 91d17586..4b972125 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/shard/namespace.rs
@@ -15,39 +15,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
+pub struct IggyNamespace {
+    pub(crate) stream_id: u32,
+    pub(crate) topic_id: u32,
+    pub(crate) partition_id: u32,
+}
 
-#[cfg(not(feature = "disable-mimalloc"))]
-use mimalloc::MiMalloc;
-
-#[cfg(not(feature = "disable-mimalloc"))]
-#[global_allocator]
-static GLOBAL: MiMalloc = MiMalloc;
-
-#[cfg(windows)]
-compile_error!("iggy-server doesn't support windows.");
-
-pub mod archiver;
-pub mod args;
-pub mod binary;
-pub mod channels;
-pub(crate) mod compat;
-pub mod configs;
-pub mod http;
-pub mod log;
-pub mod quic;
-pub mod server_error;
-pub mod state;
-pub mod streaming;
-pub mod tcp;
-pub mod versioning;
-
-const VERSION: &str = env!("CARGO_PKG_VERSION");
-const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME";
-const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD";
+impl IggyNamespace {
+    pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self {
+        Self {
+            stream_id,
+            topic_id,
+            partition_id,
+        }
+    }
 
-pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str {
-    match enabled {
-        true => "enabled",
-        false => "disabled",
+    pub fn generate_hash(&self) -> u32 {
+        todo!();
     }
 }

Reply via email to