This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 89b8b70f feat(io_uring): migrate background tasks to new task runner 
(#2181)
89b8b70f is described below

commit 89b8b70f11a91502cd3ae41b46c59f75e984f0a2
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Sep 19 14:17:42 2025 +0200

    feat(io_uring): migrate background tasks to new task runner (#2181)
---
 core/server/src/channels/commands/archive_state.rs | 143 ---------------------
 .../commands/clean_personal_access_tokens.rs       | 127 +++++-------------
 core/server/src/channels/commands/mod.rs           |   1 -
 core/server/src/channels/commands/print_sysinfo.rs |  97 +++-----------
 core/server/src/channels/commands/save_messages.rs | 142 ++++++++------------
 .../src/channels/commands/verify_heartbeats.rs     | 123 ++++--------------
 core/server/src/channels/handler.rs                |  46 -------
 core/server/src/channels/mod.rs                    |   2 -
 core/server/src/channels/server_command.rs         |  39 ------
 core/server/src/main.rs                            |  28 +---
 core/server/src/quic/listener.rs                   |   4 +-
 core/server/src/shard/mod.rs                       |  38 +++++-
 core/server/src/shard/system/clients.rs            |   2 +-
 core/server/src/slab/streams.rs                    |  50 ++++---
 core/server/src/streaming/partitions/helpers.rs    |   5 -
 15 files changed, 192 insertions(+), 655 deletions(-)

diff --git a/core/server/src/channels/commands/archive_state.rs 
b/core/server/src/channels/commands/archive_state.rs
deleted file mode 100644
index f42393ae..00000000
--- a/core/server/src/channels/commands/archive_state.rs
+++ /dev/null
@@ -1,143 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//TODO: Fixme
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::StateMaintenanceConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyTimestamp;
-use tokio::time;
-use tracing::{error, info, instrument, warn};
-
-pub struct StateArchiver {
-    enabled: bool,
-    overwrite: bool,
-    interval: IggyDuration,
-    sender: Sender<ArchiveStateCommand>,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct ArchiveStateCommand {
-    overwrite: bool,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct ArchiveStateExecutor;
-
-impl StateArchiver {
-    pub fn new(config: &StateMaintenanceConfig, sender: 
Sender<ArchiveStateCommand>) -> Self {
-        Self {
-            enabled: config.archiver_enabled,
-            overwrite: config.overwrite,
-            interval: config.interval,
-            sender,
-        }
-    }
-
-    pub fn start(&self) {
-        if !self.enabled {
-            info!("State archiver is disabled.");
-            return;
-        }
-
-        let overwrite = self.overwrite;
-        let interval = self.interval;
-        let sender = self.sender.clone();
-        info!("State archiver is enabled, state will be archived every: 
{interval}.");
-        tokio::spawn(async move {
-            let mut interval_timer = time::interval(interval.get_duration());
-            loop {
-                interval_timer.tick().await;
-                sender
-                    .send(ArchiveStateCommand { overwrite })
-                    .unwrap_or_else(|err| {
-                        error!("Failed to send ArchiveStateCommand. Error: 
{}", err);
-                    });
-            }
-        });
-    }
-}
-
-impl BackgroundServerCommand<ArchiveStateCommand> for ArchiveStateExecutor {
-    #[instrument(skip_all, name = "trace_archive_state")]
-    async fn execute(&mut self, system: &SharedSystem, command: 
ArchiveStateCommand) {
-        let system = system.read().await;
-        if system.archiver.is_none() {
-            warn!("Archiver is disabled, state will not be archived.");
-            return;
-        }
-
-        let base_directory = if command.overwrite {
-            None
-        } else {
-            Some(format!("{}_state", IggyTimestamp::now().as_micros()))
-        };
-        let state_messages_file_path = 
system.config.get_state_messages_file_path();
-        let state_info_path = system.config.get_state_info_path();
-        info!("Archiving state...");
-        let archiver = system.archiver.as_ref().unwrap();
-        let files = [state_info_path.as_ref(), 
state_messages_file_path.as_ref()];
-        if let Err(error) = archiver.archive(&files, base_directory).await {
-            error!("Failed to archive state. Error: {}", error);
-            return;
-        }
-        info!("State archived successfully.");
-    }
-
-    fn start_command_sender(
-        &mut self,
-        _system: SharedSystem,
-        config: &crate::configs::server::ServerConfig,
-        sender: Sender<ArchiveStateCommand>,
-    ) {
-        if !config.data_maintenance.archiver.enabled
-            || !config.data_maintenance.state.archiver_enabled
-        {
-            return;
-        }
-
-        let state_archiver = 
StateArchiver::new(&config.data_maintenance.state, sender);
-        state_archiver.start();
-    }
-
-    fn start_command_consumer(
-        mut self,
-        system: SharedSystem,
-        config: &crate::configs::server::ServerConfig,
-        receiver: flume::Receiver<ArchiveStateCommand>,
-    ) {
-        if !config.data_maintenance.archiver.enabled
-            || !config.data_maintenance.state.archiver_enabled
-        {
-            return;
-        }
-
-        tokio::spawn(async move {
-            let system = system.clone();
-            while let Ok(command) = receiver.recv_async().await {
-                self.execute(&system, command).await;
-            }
-            info!("State archiver receiver stopped.");
-        });
-    }
-}
-
-*/
diff --git a/core/server/src/channels/commands/clean_personal_access_tokens.rs 
b/core/server/src/channels/commands/clean_personal_access_tokens.rs
index fd2eaf11..12153356 100644
--- a/core/server/src/channels/commands/clean_personal_access_tokens.rs
+++ b/core/server/src/channels/commands/clean_personal_access_tokens.rs
@@ -16,77 +16,36 @@
  * under the License.
  */
 
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::PersonalAccessTokenCleanerConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyTimestamp;
-use tokio::time;
-use tracing::{debug, error, info, instrument};
+use std::rc::Rc;
 
-pub struct PersonalAccessTokenCleaner {
-    enabled: bool,
-    interval: IggyDuration,
-    sender: Sender<CleanPersonalAccessTokensCommand>,
-}
+use crate::shard::IggyShard;
+use iggy_common::{IggyError, IggyTimestamp};
+use tracing::{debug, info, trace};
 
-#[derive(Debug, Default, Clone)]
-pub struct CleanPersonalAccessTokensCommand;
-
-#[derive(Debug, Default, Clone)]
-pub struct CleanPersonalAccessTokensExecutor;
-
-impl PersonalAccessTokenCleaner {
-    pub fn new(
-        config: &PersonalAccessTokenCleanerConfig,
-        sender: Sender<CleanPersonalAccessTokensCommand>,
-    ) -> Self {
-        Self {
-            enabled: config.enabled,
-            interval: config.interval,
-            sender,
-        }
+pub async fn clear_personal_access_tokens(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
+    let config = &shard.config.personal_access_token.cleaner;
+    if !config.enabled {
+        info!("Personal access token cleaner is disabled.");
+        return Ok(());
     }
 
-    pub fn start(&self) {
-        if !self.enabled {
-            info!("Personal access token cleaner is disabled.");
-            return;
-        }
+    info!(
+        "Personal access token cleaner is enabled, expired tokens will be 
deleted every: {}.",
+        config.interval
+    );
 
-        let interval = self.interval;
-        let sender = self.sender.clone();
-        info!(
-            "Personal access token cleaner is enabled, expired tokens will be 
deleted every: {interval}."
-        );
-        tokio::spawn(async move {
-            let mut interval_timer = time::interval(interval.get_duration());
-            loop {
-                interval_timer.tick().await;
-                sender
-                    .send(CleanPersonalAccessTokensCommand)
-                    .unwrap_or_else(|error| {
-                        error!(
-                            "Failed to send CleanPersonalAccessTokensCommand. 
Error: {}",
-                            error
-                        );
-                    });
-            }
-        });
-    }
-}
+    let interval = config.interval.get_duration();
+    let mut interval_timer = compio::time::interval(interval);
+
+    loop {
+        interval_timer.tick().await;
+        trace!("Cleaning expired personal access tokens...");
 
-impl BackgroundServerCommand<CleanPersonalAccessTokensCommand>
-    for CleanPersonalAccessTokensExecutor
-{
-    #[instrument(skip_all, name = "trace_clean_personal_access_tokens")]
-    async fn execute(&mut self, system: &SharedSystem, _command: 
CleanPersonalAccessTokensCommand) {
-        let system = system.read().await;
+        let users = shard.users.borrow();
         let now = IggyTimestamp::now();
         let mut deleted_tokens_count = 0;
-        for (_, user) in system.users.iter() {
+
+        for (_, user) in users.iter() {
             let expired_tokens = user
                 .personal_access_tokens
                 .iter()
@@ -96,45 +55,21 @@ impl 
BackgroundServerCommand<CleanPersonalAccessTokensCommand>
 
             for token in expired_tokens {
                 debug!(
-                    "Personal access token: {token} for user with ID: {} is 
expired.",
-                    user.id
+                    "Personal access token: {} for user with ID: {} is 
expired.",
+                    token, user.id
                 );
                 deleted_tokens_count += 1;
                 user.personal_access_tokens.remove(&token);
                 debug!(
-                    "Deleted personal access token: {token} for user with ID: 
{}.",
-                    user.id
+                    "Deleted personal access token: {} for user with ID: {}.",
+                    token, user.id
                 );
             }
         }
-        info!("Deleted {deleted_tokens_count} expired personal access 
tokens.");
-    }
-
-    fn start_command_sender(
-        &mut self,
-        _system: SharedSystem,
-        config: &crate::configs::server::ServerConfig,
-        sender: Sender<CleanPersonalAccessTokensCommand>,
-    ) {
-        let personal_access_token_cleaner =
-            
PersonalAccessTokenCleaner::new(&config.personal_access_token.cleaner, sender);
-        personal_access_token_cleaner.start();
-    }
 
-    fn start_command_consumer(
-        mut self,
-        system: SharedSystem,
-        _config: &crate::configs::server::ServerConfig,
-        receiver: flume::Receiver<CleanPersonalAccessTokensCommand>,
-    ) {
-        tokio::spawn(async move {
-            let system = system.clone();
-            while let Ok(command) = receiver.recv_async().await {
-                self.execute(&system, command).await;
-            }
-            info!("Personal access token cleaner receiver stopped.");
-        });
+        info!(
+            "Deleted {} expired personal access tokens.",
+            deleted_tokens_count
+        );
     }
-}
-
-*/
+}
\ No newline at end of file
diff --git a/core/server/src/channels/commands/mod.rs 
b/core/server/src/channels/commands/mod.rs
index 13b1bf91..0130170b 100644
--- a/core/server/src/channels/commands/mod.rs
+++ b/core/server/src/channels/commands/mod.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-pub mod archive_state;
 pub mod clean_personal_access_tokens;
 pub mod print_sysinfo;
 pub mod save_messages;
diff --git a/core/server/src/channels/commands/print_sysinfo.rs 
b/core/server/src/channels/commands/print_sysinfo.rs
index c0f2b1a2..05f68372 100644
--- a/core/server/src/channels/commands/print_sysinfo.rs
+++ b/core/server/src/channels/commands/print_sysinfo.rs
@@ -16,63 +16,33 @@
  * under the License.
  */
 
-// TODO: Fixme
-/*
-use crate::{
-    channels::server_command::BackgroundServerCommand,
-    configs::server::ServerConfig,
-    streaming::{systems::system::SharedSystem, utils::memory_pool},
-};
-use flume::{Receiver, Sender};
-use human_repr::HumanCount;
-use iggy_common::IggyDuration;
-use tokio::time::{self};
-use tracing::{error, info, warn};
-
-#[derive(Debug, Default, Clone)]
-pub struct SysInfoPrintCommand;
-
-pub struct SysInfoPrinter {
-    interval: IggyDuration,
-    sender: Sender<SysInfoPrintCommand>,
-}
-
-pub struct SysInfoPrintExecutor;
+use std::rc::Rc;
 
-impl SysInfoPrinter {
-    pub fn new(interval: IggyDuration, sender: Sender<SysInfoPrintCommand>) -> 
Self {
-        Self { interval, sender }
-    }
+use crate::shard::IggyShard;
+use crate::streaming::utils::memory_pool;
+use human_repr::HumanCount;
+use iggy_common::IggyError;
+use tracing::{error, info, trace};
 
-    pub fn start(&self) {
-        let interval = self.interval;
-        let sender = self.sender.clone();
-        if interval.is_zero() {
-            info!("SysInfoPrinter is disabled.");
-            return;
-        }
+pub async fn print_sys_info(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+    let config = &shard.config.system.logging;
+    let interval = config.sysinfo_print_interval;
 
-        info!("SysInfoPrinter is enabled, system information will be printed 
every {interval}.");
-        tokio::spawn(async move {
-            let mut interval_timer = time::interval(interval.get_duration());
-            loop {
-                interval_timer.tick().await;
-                let command = SysInfoPrintCommand {};
-                sender.send(command).unwrap_or_else(|e| {
-                    error!("Failed to send SysInfoPrintCommand. Error: {e}");
-                });
-            }
-        });
+    if interval.is_zero() {
+        info!("SysInfoPrinter is disabled.");
+        return Ok(());
     }
-}
+    info!("SysInfoPrinter is enabled, system information will be printed every 
{interval}.");
+    let mut interval_timer = compio::time::interval(interval.get_duration());
+    loop {
+        interval_timer.tick().await;
+        trace!("Printing system information...");
 
-impl BackgroundServerCommand<SysInfoPrintCommand> for SysInfoPrintExecutor {
-    async fn execute(&mut self, system: &SharedSystem, _command: 
SysInfoPrintCommand) {
-        let stats = match system.read().await.get_stats().await {
+        let stats = match shard.get_stats().await {
             Ok(stats) => stats,
             Err(e) => {
                 error!("Failed to get system information. Error: {e}");
-                return;
+                continue;
             }
         };
 
@@ -97,31 +67,4 @@ impl BackgroundServerCommand<SysInfoPrintCommand> for 
SysInfoPrintExecutor {
 
         memory_pool().log_stats();
     }
-
-    fn start_command_sender(
-        &mut self,
-        _system: SharedSystem,
-        config: &ServerConfig,
-        sender: Sender<SysInfoPrintCommand>,
-    ) {
-        let printer = 
SysInfoPrinter::new(config.system.logging.sysinfo_print_interval, sender);
-        printer.start();
-    }
-
-    fn start_command_consumer(
-        mut self,
-        system: SharedSystem,
-        _config: &ServerConfig,
-        receiver: Receiver<SysInfoPrintCommand>,
-    ) {
-        tokio::spawn(async move {
-            let system = system.clone();
-            while let Ok(command) = receiver.recv_async().await {
-                self.execute(&system, command).await;
-            }
-            warn!("Sysinfo printer stopped receiving commands.");
-        });
-    }
-}
-
-*/
+}
\ No newline at end of file
diff --git a/core/server/src/channels/commands/save_messages.rs 
b/core/server/src/channels/commands/save_messages.rs
index bf832762..51731042 100644
--- a/core/server/src/channels/commands/save_messages.rs
+++ b/core/server/src/channels/commands/save_messages.rs
@@ -16,107 +16,67 @@
  * under the License.
  */
 
-//Todo: Fixme
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::MessageSaverConfig;
-use crate::configs::server::ServerConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::{Receiver, Sender};
-use iggy_common::IggyDuration;
-use tokio::time;
-use tracing::{error, info, instrument, warn};
+use crate::{shard::IggyShard, shard_info};
+use iggy_common::{Identifier, IggyError};
+use std::rc::Rc;
+use tracing::{error, info, trace};
 
-pub struct MessagesSaver {
-    enabled: bool,
-    enforce_fsync: bool,
-    interval: IggyDuration,
-    sender: Sender<SaveMessagesCommand>,
-}
+pub async fn save_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+    let config = &shard.config.message_saver;
+    if !config.enabled {
+        info!("Message saver is disabled.");
+        return Ok(());
+    }
 
-#[derive(Debug, Default, Clone)]
-pub struct SaveMessagesCommand {
-    pub enforce_fsync: bool,
-}
+    // TODO: Maybe we should get rid of it in order to not complicate, and use 
the fsync settings per partition from config.
+    let enforce_fsync = config.enforce_fsync;
+    let interval = config.interval;
+    info!(
+        "Message saver is enabled, buffered messages will be automatically 
saved every: {interval}, enforce fsync: {enforce_fsync}."
+    );
 
-#[derive(Debug, Default, Clone)]
-pub struct SaveMessagesExecutor;
+    let mut interval_timer = compio::time::interval(interval.get_duration());
+    loop {
+        interval_timer.tick().await;
+        trace!("Saving buffered messages...");
 
-impl MessagesSaver {
-    pub fn new(config: &MessageSaverConfig, sender: 
Sender<SaveMessagesCommand>) -> Self {
-        Self {
-            enabled: config.enabled,
-            enforce_fsync: config.enforce_fsync,
-            interval: config.interval,
-            sender,
-        }
-    }
+        let namespaces = shard.get_current_shard_namespaces();
+        let mut total_saved_messages = 0u32;
+        let reason = "background saver triggered".to_string();
 
-    pub fn start(&self) {
-        if !self.enabled {
-            info!("Message saver is disabled.");
-            return;
-        }
+        for ns in namespaces {
+            let stream_id = Identifier::numeric(ns.stream_id() as 
u32).unwrap();
+            let topic_id = Identifier::numeric(ns.topic_id() as u32).unwrap();
+            let partition_id = ns.partition_id();
 
-        let enforce_fsync = self.enforce_fsync;
-        let interval = self.interval;
-        let sender = self.sender.clone();
-        info!(
-            "Message saver is enabled, buffered messages will be automatically 
saved every: {interval}, enforce fsync: {enforce_fsync}."
-        );
-        tokio::spawn(async move {
-            let mut interval_timer = time::interval(interval.get_duration());
-            loop {
-                interval_timer.tick().await;
-                let command = SaveMessagesCommand { enforce_fsync };
-                sender.send(command).unwrap_or_else(|e| {
-                    error!("Failed to send SaveMessagesCommand. Error: {e}",);
-                });
-            }
-        });
-    }
-}
-
-impl BackgroundServerCommand<SaveMessagesCommand> for SaveMessagesExecutor {
-    #[instrument(skip_all, name = "trace_save_messages")]
-    async fn execute(&mut self, system: &SharedSystem, _command: 
SaveMessagesCommand) {
-        let saved_messages_count = 
system.read().await.persist_messages().await;
-        match saved_messages_count {
-            Ok(n) => {
-                if n > 0 {
-                    info!("Saved {n} buffered messages on disk.");
+            match shard
+                .streams2
+                .persist_messages(
+                    shard.id,
+                    &stream_id,
+                    &topic_id,
+                    partition_id,
+                    reason.clone(),
+                    &shard.config.system,
+                )
+                .await
+            {
+                Ok(batch_count) => {
+                    total_saved_messages += batch_count;
+                }
+                Err(err) => {
+                    error!(
+                        "Failed to save messages for partition {}: {}",
+                        partition_id, err
+                    );
                 }
-            }
-            Err(e) => {
-                error!("Couldn't save buffered messages on disk. Error: {e}");
             }
         }
-    }
 
-    fn start_command_sender(
-        &mut self,
-        _system: SharedSystem,
-        config: &ServerConfig,
-        sender: Sender<SaveMessagesCommand>,
-    ) {
-        let messages_saver = MessagesSaver::new(&config.message_saver, sender);
-        messages_saver.start();
-    }
+        if total_saved_messages > 0 {
+            shard_info!(shard.id, "Saved {} buffered messages on disk.", 
total_saved_messages);
+        }
 
-    fn start_command_consumer(
-        mut self,
-        system: SharedSystem,
-        _config: &ServerConfig,
-        receiver: Receiver<SaveMessagesCommand>,
-    ) {
-        tokio::spawn(async move {
-            let system = system.clone();
-            while let Ok(command) = receiver.recv_async().await {
-                self.execute(&system, command).await;
-            }
-            warn!("Server command handler stopped receiving commands.");
-        });
+        trace!("Finished saving buffered messages.");
     }
 }
-
-*/
diff --git a/core/server/src/channels/commands/verify_heartbeats.rs 
b/core/server/src/channels/commands/verify_heartbeats.rs
index 74eacabe..123c174f 100644
--- a/core/server/src/channels/commands/verify_heartbeats.rs
+++ b/core/server/src/channels/commands/verify_heartbeats.rs
@@ -16,88 +16,41 @@
  * under the License.
  */
 
-//TODO: Fixme
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::HeartbeatConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyTimestamp;
-use iggy_common::locking::IggyRwLockFn;
-use tokio::time;
-use tracing::{debug, error, info, instrument, warn};
+use crate::shard::IggyShard;
+use iggy_common::{IggyDuration, IggyError, IggyTimestamp};
+use std::rc::Rc;
+use tracing::{debug, info, trace, warn};
 
 const MAX_THRESHOLD: f64 = 1.2;
 
-pub struct VerifyHeartbeats {
-    enabled: bool,
-    interval: IggyDuration,
-    sender: Sender<VerifyHeartbeatsCommand>,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct VerifyHeartbeatsCommand {
-    interval: IggyDuration,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct VerifyHeartbeatsExecutor;
-
-impl VerifyHeartbeats {
-    pub fn new(config: &HeartbeatConfig, sender: 
Sender<VerifyHeartbeatsCommand>) -> Self {
-        Self {
-            enabled: config.enabled,
-            interval: config.interval,
-            sender,
-        }
+pub async fn verify_heartbeats(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+    let config = &shard.config.heartbeat;
+    if !config.enabled {
+        info!("Heartbeats verification is disabled.");
+        return Ok(());
     }
 
-    pub fn start(&self) {
-        if !self.enabled {
-            info!("Heartbeats verification is disabled.");
-            return;
-        }
+    let interval = config.interval;
+    let max_interval = IggyDuration::from((MAX_THRESHOLD * 
interval.as_micros() as f64) as u64);
+    info!("Heartbeats will be verified every: {interval}. Max allowed 
interval: {max_interval}.");
 
-        let interval = self.interval;
-        let max_interval = IggyDuration::from((MAX_THRESHOLD * 
interval.as_micros() as f64) as u64);
-        let sender = self.sender.clone();
-        info!(
-            "Heartbeats will be verified every: {interval}. Max allowed 
interval: {max_interval}."
-        );
-        tokio::spawn(async move {
-            let mut interval_timer = time::interval(interval.get_duration());
-            loop {
-                interval_timer.tick().await;
-                debug!("Verifying heartbeats...");
-                sender
-                    .send(VerifyHeartbeatsCommand {
-                        interval: max_interval,
-                    })
-                    .unwrap_or_else(|error| {
-                        error!("Failed to send VerifyHeartbeats. Error: {}", 
error);
-                    });
-            }
-        });
-    }
-}
+    let mut interval_timer = compio::time::interval(interval.get_duration());
 
-impl BackgroundServerCommand<VerifyHeartbeatsCommand> for 
VerifyHeartbeatsExecutor {
-    #[instrument(skip_all, name = "trace_verify_heartbeats")]
-    async fn execute(&mut self, system: &SharedSystem, command: 
VerifyHeartbeatsCommand) {
-        let system = system.read().await;
-        let clients;
-        {
-            let client_manager = system.client_manager.read().await;
-            clients = client_manager.get_clients();
-        }
+    loop {
+        interval_timer.tick().await;
+        trace!("Verifying heartbeats...");
+
+        let clients = {
+            let client_manager = shard.client_manager.borrow();
+            client_manager.get_clients()
+        };
 
         let now = IggyTimestamp::now();
-        let heartbeat_to = IggyTimestamp::from(now.as_micros() - 
command.interval.as_micros());
+        let heartbeat_to = IggyTimestamp::from(now.as_micros() - 
max_interval.as_micros());
         debug!("Verifying heartbeats at: {now}, max allowed timestamp: 
{heartbeat_to}");
+
         let mut stale_clients = Vec::new();
         for client in clients {
-            let client = client.read().await;
             if client.last_heartbeat.as_micros() < heartbeat_to.as_micros() {
                 warn!(
                     "Stale client session: {}, last heartbeat at: {}, max 
allowed timestamp: {heartbeat_to}",
@@ -114,40 +67,14 @@ impl BackgroundServerCommand<VerifyHeartbeatsCommand> for 
VerifyHeartbeatsExecut
         }
 
         if stale_clients.is_empty() {
-            return;
+            continue;
         }
 
         let count = stale_clients.len();
         info!("Removing {count} stale clients...");
         for client_id in stale_clients {
-            system.delete_client(client_id).await;
+            shard.delete_client(client_id);
         }
         info!("Removed {count} stale clients.");
     }
-
-    fn start_command_sender(
-        &mut self,
-        _system: SharedSystem,
-        config: &crate::configs::server::ServerConfig,
-        sender: Sender<VerifyHeartbeatsCommand>,
-    ) {
-        let verify_heartbeats = VerifyHeartbeats::new(&config.heartbeat, 
sender);
-        verify_heartbeats.start();
-    }
-
-    fn start_command_consumer(
-        mut self,
-        system: SharedSystem,
-        _config: &crate::configs::server::ServerConfig,
-        receiver: flume::Receiver<VerifyHeartbeatsCommand>,
-    ) {
-        tokio::spawn(async move {
-            let system = system.clone();
-            while let Ok(command) = receiver.recv_async().await {
-                self.execute(&system, command).await;
-            }
-            info!("Heartbeats verifier receiver stopped.");
-        });
-    }
 }
-*/
diff --git a/core/server/src/channels/handler.rs 
b/core/server/src/channels/handler.rs
deleted file mode 100644
index ee44fd58..00000000
--- a/core/server/src/channels/handler.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-use std::rc::Rc;
-
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-use super::server_command::BackgroundServerCommand;
-use crate::{configs::server::ServerConfig, shard::IggyShard};
-
-pub struct BackgroundServerCommandHandler<'a> {
-    shard: Rc<IggyShard>,
-    config: &'a ServerConfig,
-}
-
-impl<'a> BackgroundServerCommandHandler<'a> {
-    pub fn new(shard: Rc<IggyShard>, config: &'a ServerConfig) -> Self {
-        Self { shard, config }
-    }
-
-    pub fn install_handler<C, E>(&mut self, mut executor: E) -> Self
-    where
-        E: BackgroundServerCommand<C> + Send + Sync + 'static,
-    {
-        let (sender, receiver) = flume::unbounded();
-        let shard = self.shard.clone();
-        executor.start_command_sender(shard.clone(), self.config, sender);
-        executor.start_command_consumer(shard.clone(), self.config, receiver);
-        Self {
-            shard,
-            config: self.config,
-        }
-    }
-}
diff --git a/core/server/src/channels/mod.rs b/core/server/src/channels/mod.rs
index c3da041b..d147f7e0 100644
--- a/core/server/src/channels/mod.rs
+++ b/core/server/src/channels/mod.rs
@@ -17,5 +17,3 @@
  */
 
 pub mod commands;
-pub mod handler;
-pub mod server_command;
diff --git a/core/server/src/channels/server_command.rs 
b/core/server/src/channels/server_command.rs
deleted file mode 100644
index ac490737..00000000
--- a/core/server/src/channels/server_command.rs
+++ /dev/null
@@ -1,39 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::{configs::server::ServerConfig, shard::IggyShard};
-use flume::{Receiver, Sender};
-use std::{future::Future, rc::Rc};
-
-pub trait BackgroundServerCommand<C> {
-    fn execute(&mut self, system: &IggyShard, command: C) -> impl 
Future<Output = ()>;
-
-    fn start_command_sender(
-        &mut self,
-        shard: Rc<IggyShard>,
-        config: &ServerConfig,
-        sender: Sender<C>,
-    );
-
-    fn start_command_consumer(
-        self,
-        shard: Rc<IggyShard>,
-        config: &ServerConfig,
-        receiver: Receiver<C>,
-    );
-}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 02ed206a..86d9576d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -50,7 +50,9 @@ use server::server_error::{ConfigError, ServerError};
 use server::shard::namespace::IggyNamespace;
 use server::shard::system::info::SystemInfo;
 use server::shard::{IggyShard, ShardInfo, calculate_shard_assignment};
-use server::slab::traits_ext::{EntityComponentSystem, 
EntityComponentSystemMutCell, IntoComponents};
+use server::slab::traits_ext::{
+    EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents,
+};
 use server::state::StateKind;
 use server::state::command::EntryCommand;
 use server::state::file::FileState;
@@ -270,7 +272,7 @@ async fn main() -> Result<(), ServerError> {
         ));
 
         // Ergh... I knew this will backfire to include `Log` as part of the 
`Partition` entity,
-        // We have to initialize with an default log with every partition, 
once we `Clone` the Streams / Topics / Partitions,
+        // We have to initialize with a default log for every partition, once 
we `Clone` the Streams / Topics / Partitions,
         // because `Clone` impl for `Partition` does not clone the actual log, 
just creates an empty one.
         streams.with_components(|components| {
             let (root, ..) = components.into_components();
@@ -356,7 +358,6 @@ async fn main() -> Result<(), ServerError> {
     #[cfg(not(feature = "disable-mimalloc"))]
     info!("Using mimalloc allocator");
 
-
     let system = SharedSystem::new(System::new(
         config.system.clone(),
         config.cluster.clone(),
@@ -365,28 +366,7 @@ async fn main() -> Result<(), ServerError> {
     ));
     */
 
-    // Workaround to ensure that the statistics are initialized before the 
server
-    // loads streams and starts accepting connections. This is necessary to
-    // have the correct statistics when the server starts.
-    //system.write().await.get_stats().await?;
-    //system.write().await.init().await?;
-
     /*
-    let _command_handler = BackgroundServerCommandHandler::new(system.clone(), 
&config)
-        .install_handler(SaveMessagesExecutor)
-        .install_handler(MaintainMessagesExecutor)
-        .install_handler(CleanPersonalAccessTokensExecutor)
-        .install_handler(SysInfoPrintExecutor)
-        .install_handler(VerifyHeartbeatsExecutor);
-
-    #[cfg(unix)]
-    let (mut ctrl_c, mut sigterm) = {
-        use tokio::signal::unix::{SignalKind, signal};
-        (
-            signal(SignalKind::interrupt())?,
-            signal(SignalKind::terminate())?,
-        )
-    };
 
     let mut current_config = config.clone();
 
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index f9dfa471..a2d00eda 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -118,12 +118,12 @@ async fn accept_stream(
     match connection.accept_bi().await {
         Err(compio_quic::ConnectionError::ApplicationClosed { .. }) => {
             info!("Connection closed");
-            shard.delete_client(client_id).await;
+            shard.delete_client(client_id);
             Ok(None)
         }
         Err(error) => {
             error!("Error when handling QUIC stream: {:?}", error);
-            shard.delete_client(client_id).await;
+            shard.delete_client(client_id);
             Err(error.into())
         }
         Ok(stream) => Ok(Some(stream)),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index e64cafb6..84b038ca 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -153,7 +153,7 @@ pub struct IggyShard {
 
     // Heart transplant of the old streams structure.
     pub(crate) streams2: Streams,
-    shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>>,
+    pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>>,
     // TODO: Refactor.
     pub(crate) storage: Rc<SystemStorage>,
     pub(crate) state: StateKind,
@@ -274,6 +274,25 @@ impl IggyShard {
             )));
         }
 
+        tasks.push(Box::pin(
+            
crate::channels::commands::clean_personal_access_tokens::clear_personal_access_tokens(
+                self.clone(),
+            ),
+        ));
+        // TOOD: Fixme, not always id 0 is the first shard.
+        if self.id == 0 {
+            tasks.push(Box::pin(
+                
crate::channels::commands::print_sysinfo::print_sys_info(self.clone()),
+            ));
+        }
+
+        tasks.push(Box::pin(
+            
crate::channels::commands::verify_heartbeats::verify_heartbeats(self.clone()),
+        ));
+        tasks.push(Box::pin(
+            
crate::channels::commands::save_messages::save_messages(self.clone()),
+        ));
+
         let stop_receiver = self.get_stop_receiver();
         let shard_for_shutdown = self.clone();
 
@@ -300,7 +319,6 @@ impl IggyShard {
 
     async fn load_segments(&self) -> Result<(), IggyError> {
         use crate::bootstrap::load_segments;
-        use crate::shard::namespace::IggyNamespace;
         for shard_entry in self.shards_table.iter() {
             let (namespace, shard_info) = shard_entry.pair();
 
@@ -342,7 +360,7 @@ impl IggyShard {
                             &Identifier::numeric(stream_id as u32).unwrap(),
                             &Identifier::numeric(topic_id as u32).unwrap(),
                             partition_id,
-                            |(_,_,_ , offset,  .., mut log)| {
+                            |(_, _, _, offset, .., log)| {
                                 *log = loaded_log;
                                 let current_offset = 
log.active_segment().end_offset;
                                 offset.store(current_offset, 
Ordering::Relaxed);
@@ -992,6 +1010,20 @@ impl IggyShard {
         self.shards_table.insert(ns, shard_info);
     }
 
+    pub fn get_current_shard_namespaces(&self) -> Vec<IggyNamespace> {
+        self.shards_table
+            .iter()
+            .filter_map(|entry| {
+                let (ns, shard_info) = entry.pair();
+                if shard_info.id == self.id {
+                    Some(*ns)
+                } else {
+                    None
+                }
+            })
+            .collect()
+    }
+
     pub fn insert_shard_table_records(
         &self,
         records: impl IntoIterator<Item = (IggyNamespace, ShardInfo)>,
diff --git a/core/server/src/shard/system/clients.rs 
b/core/server/src/shard/system/clients.rs
index c4543148..ea225378 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -36,7 +36,7 @@ impl IggyShard {
         session
     }
 
-    pub async fn delete_client(&self, client_id: u32) {
+    pub fn delete_client(&self, client_id: u32) {
         let consumer_groups: Vec<(u32, u32, u32)>;
 
         {
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 549b8d82..02ab2850 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -214,18 +214,22 @@ impl MainOps for Streams {
 
         // Try committing the journal
         if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
-            self.persist_messages(
-                shard_id,
+            let reason = self.with_partition_by_id(
                 stream_id,
                 topic_id,
                 partition_id,
-                unsaved_messages_count_exceeded,
-                unsaved_messages_size_exceeded,
-                journal_messages_count,
-                journal_size,
-                config,
-            )
-            .await?;
+                streaming_partitions::helpers::persist_reason(
+                    unsaved_messages_count_exceeded,
+                    unsaved_messages_size_exceeded,
+                    journal_messages_count,
+                    journal_size,
+                    config,
+                ),
+            );
+
+            let _batch_count = self
+                .persist_messages(shard_id, stream_id, topic_id, partition_id, 
reason, config)
+                .await?;
 
             if is_full {
                 self.handle_full_segment(shard_id, stream_id, topic_id, 
partition_id, config)
@@ -778,12 +782,16 @@ impl Streams {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
-        unsaved_messages_count_exceeded: bool,
-        unsaved_messages_size_exceeded: bool,
-        journal_messages_count: u32,
-        journal_size: u32,
+        reason: String,
         config: &SystemConfig,
-    ) -> Result<(), IggyError> {
+    ) -> Result<u32, IggyError> {
+        let is_empty = self.with_partition_by_id(stream_id, topic_id, 
partition_id, |(.., log)| {
+            log.journal().is_empty()
+        });
+        if is_empty {
+            return Ok(0);
+        }
+
         let batches = self.with_partition_by_id_mut(
             stream_id,
             topic_id,
@@ -791,18 +799,6 @@ impl Streams {
             streaming_partitions::helpers::commit_journal(),
         );
 
-        let reason = self.with_partition_by_id(
-            stream_id,
-            topic_id,
-            partition_id,
-            streaming_partitions::helpers::persist_reason(
-                unsaved_messages_count_exceeded,
-                unsaved_messages_size_exceeded,
-                journal_messages_count,
-                journal_size,
-                config,
-            ),
-        );
         let (saved, batch_count) = self
             .with_partition_by_id_async(
                 stream_id,
@@ -830,6 +826,6 @@ impl Streams {
             ),
         );
 
-        Ok(())
+        Ok(batch_count)
     }
 }
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index c16020d8..f9baf62e 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -295,7 +295,6 @@ pub async fn get_messages_by_offset(
     } else {
         0
     };
-    let actual_first_offset_journal = journal.get(|batches| 
batches.first_offset()).unwrap();
     let mut combined_batch_set = IggyMessagesBatchSet::empty();
 
     // Load messages from disk if needed
@@ -362,9 +361,6 @@ async fn load_messages_from_disk_by_offset(
     }
 
     let indexes_to_read = indexes_to_read.unwrap();
-    let first = indexes_to_read.get(0).unwrap();
-    let last = indexes_to_read.last().unwrap();
-
     let batch = storage
         .messages_reader
         .as_ref()
@@ -774,7 +770,6 @@ pub fn persist_batch(
                 )
             })?;
 
-        let indices = log.active_indexes().unwrap();
         let unsaved_indexes_slice = 
log.active_indexes().unwrap().unsaved_slice();
         let len = unsaved_indexes_slice.len();
         storage


Reply via email to