This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 89b8b70f feat(io_uring): migrate background tasks to new task runner
(#2181)
89b8b70f is described below
commit 89b8b70f11a91502cd3ae41b46c59f75e984f0a2
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Sep 19 14:17:42 2025 +0200
feat(io_uring): migrate background tasks to new task runner (#2181)
---
core/server/src/channels/commands/archive_state.rs | 143 ---------------------
.../commands/clean_personal_access_tokens.rs | 127 +++++-------------
core/server/src/channels/commands/mod.rs | 1 -
core/server/src/channels/commands/print_sysinfo.rs | 97 +++-----------
core/server/src/channels/commands/save_messages.rs | 142 ++++++++------------
.../src/channels/commands/verify_heartbeats.rs | 123 ++++--------------
core/server/src/channels/handler.rs | 46 -------
core/server/src/channels/mod.rs | 2 -
core/server/src/channels/server_command.rs | 39 ------
core/server/src/main.rs | 28 +---
core/server/src/quic/listener.rs | 4 +-
core/server/src/shard/mod.rs | 38 +++++-
core/server/src/shard/system/clients.rs | 2 +-
core/server/src/slab/streams.rs | 50 ++++---
core/server/src/streaming/partitions/helpers.rs | 5 -
15 files changed, 192 insertions(+), 655 deletions(-)
diff --git a/core/server/src/channels/commands/archive_state.rs
b/core/server/src/channels/commands/archive_state.rs
deleted file mode 100644
index f42393ae..00000000
--- a/core/server/src/channels/commands/archive_state.rs
+++ /dev/null
@@ -1,143 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//TODO: Fixme
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::StateMaintenanceConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyTimestamp;
-use tokio::time;
-use tracing::{error, info, instrument, warn};
-
-pub struct StateArchiver {
- enabled: bool,
- overwrite: bool,
- interval: IggyDuration,
- sender: Sender<ArchiveStateCommand>,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct ArchiveStateCommand {
- overwrite: bool,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct ArchiveStateExecutor;
-
-impl StateArchiver {
- pub fn new(config: &StateMaintenanceConfig, sender:
Sender<ArchiveStateCommand>) -> Self {
- Self {
- enabled: config.archiver_enabled,
- overwrite: config.overwrite,
- interval: config.interval,
- sender,
- }
- }
-
- pub fn start(&self) {
- if !self.enabled {
- info!("State archiver is disabled.");
- return;
- }
-
- let overwrite = self.overwrite;
- let interval = self.interval;
- let sender = self.sender.clone();
- info!("State archiver is enabled, state will be archived every:
{interval}.");
- tokio::spawn(async move {
- let mut interval_timer = time::interval(interval.get_duration());
- loop {
- interval_timer.tick().await;
- sender
- .send(ArchiveStateCommand { overwrite })
- .unwrap_or_else(|err| {
- error!("Failed to send ArchiveStateCommand. Error:
{}", err);
- });
- }
- });
- }
-}
-
-impl BackgroundServerCommand<ArchiveStateCommand> for ArchiveStateExecutor {
- #[instrument(skip_all, name = "trace_archive_state")]
- async fn execute(&mut self, system: &SharedSystem, command:
ArchiveStateCommand) {
- let system = system.read().await;
- if system.archiver.is_none() {
- warn!("Archiver is disabled, state will not be archived.");
- return;
- }
-
- let base_directory = if command.overwrite {
- None
- } else {
- Some(format!("{}_state", IggyTimestamp::now().as_micros()))
- };
- let state_messages_file_path =
system.config.get_state_messages_file_path();
- let state_info_path = system.config.get_state_info_path();
- info!("Archiving state...");
- let archiver = system.archiver.as_ref().unwrap();
- let files = [state_info_path.as_ref(),
state_messages_file_path.as_ref()];
- if let Err(error) = archiver.archive(&files, base_directory).await {
- error!("Failed to archive state. Error: {}", error);
- return;
- }
- info!("State archived successfully.");
- }
-
- fn start_command_sender(
- &mut self,
- _system: SharedSystem,
- config: &crate::configs::server::ServerConfig,
- sender: Sender<ArchiveStateCommand>,
- ) {
- if !config.data_maintenance.archiver.enabled
- || !config.data_maintenance.state.archiver_enabled
- {
- return;
- }
-
- let state_archiver =
StateArchiver::new(&config.data_maintenance.state, sender);
- state_archiver.start();
- }
-
- fn start_command_consumer(
- mut self,
- system: SharedSystem,
- config: &crate::configs::server::ServerConfig,
- receiver: flume::Receiver<ArchiveStateCommand>,
- ) {
- if !config.data_maintenance.archiver.enabled
- || !config.data_maintenance.state.archiver_enabled
- {
- return;
- }
-
- tokio::spawn(async move {
- let system = system.clone();
- while let Ok(command) = receiver.recv_async().await {
- self.execute(&system, command).await;
- }
- info!("State archiver receiver stopped.");
- });
- }
-}
-
-*/
diff --git a/core/server/src/channels/commands/clean_personal_access_tokens.rs
b/core/server/src/channels/commands/clean_personal_access_tokens.rs
index fd2eaf11..12153356 100644
--- a/core/server/src/channels/commands/clean_personal_access_tokens.rs
+++ b/core/server/src/channels/commands/clean_personal_access_tokens.rs
@@ -16,77 +16,36 @@
* under the License.
*/
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::PersonalAccessTokenCleanerConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyTimestamp;
-use tokio::time;
-use tracing::{debug, error, info, instrument};
+use std::rc::Rc;
-pub struct PersonalAccessTokenCleaner {
- enabled: bool,
- interval: IggyDuration,
- sender: Sender<CleanPersonalAccessTokensCommand>,
-}
+use crate::shard::IggyShard;
+use iggy_common::{IggyError, IggyTimestamp};
+use tracing::{debug, info, trace};
-#[derive(Debug, Default, Clone)]
-pub struct CleanPersonalAccessTokensCommand;
-
-#[derive(Debug, Default, Clone)]
-pub struct CleanPersonalAccessTokensExecutor;
-
-impl PersonalAccessTokenCleaner {
- pub fn new(
- config: &PersonalAccessTokenCleanerConfig,
- sender: Sender<CleanPersonalAccessTokensCommand>,
- ) -> Self {
- Self {
- enabled: config.enabled,
- interval: config.interval,
- sender,
- }
+pub async fn clear_personal_access_tokens(shard: Rc<IggyShard>) -> Result<(),
IggyError> {
+ let config = &shard.config.personal_access_token.cleaner;
+ if !config.enabled {
+ info!("Personal access token cleaner is disabled.");
+ return Ok(());
}
- pub fn start(&self) {
- if !self.enabled {
- info!("Personal access token cleaner is disabled.");
- return;
- }
+ info!(
+ "Personal access token cleaner is enabled, expired tokens will be
deleted every: {}.",
+ config.interval
+ );
- let interval = self.interval;
- let sender = self.sender.clone();
- info!(
- "Personal access token cleaner is enabled, expired tokens will be
deleted every: {interval}."
- );
- tokio::spawn(async move {
- let mut interval_timer = time::interval(interval.get_duration());
- loop {
- interval_timer.tick().await;
- sender
- .send(CleanPersonalAccessTokensCommand)
- .unwrap_or_else(|error| {
- error!(
- "Failed to send CleanPersonalAccessTokensCommand.
Error: {}",
- error
- );
- });
- }
- });
- }
-}
+ let interval = config.interval.get_duration();
+ let mut interval_timer = compio::time::interval(interval);
+
+ loop {
+ interval_timer.tick().await;
+ trace!("Cleaning expired personal access tokens...");
-impl BackgroundServerCommand<CleanPersonalAccessTokensCommand>
- for CleanPersonalAccessTokensExecutor
-{
- #[instrument(skip_all, name = "trace_clean_personal_access_tokens")]
- async fn execute(&mut self, system: &SharedSystem, _command:
CleanPersonalAccessTokensCommand) {
- let system = system.read().await;
+ let users = shard.users.borrow();
let now = IggyTimestamp::now();
let mut deleted_tokens_count = 0;
- for (_, user) in system.users.iter() {
+
+ for (_, user) in users.iter() {
let expired_tokens = user
.personal_access_tokens
.iter()
@@ -96,45 +55,21 @@ impl
BackgroundServerCommand<CleanPersonalAccessTokensCommand>
for token in expired_tokens {
debug!(
- "Personal access token: {token} for user with ID: {} is
expired.",
- user.id
+ "Personal access token: {} for user with ID: {} is
expired.",
+ token, user.id
);
deleted_tokens_count += 1;
user.personal_access_tokens.remove(&token);
debug!(
- "Deleted personal access token: {token} for user with ID:
{}.",
- user.id
+ "Deleted personal access token: {} for user with ID: {}.",
+ token, user.id
);
}
}
- info!("Deleted {deleted_tokens_count} expired personal access
tokens.");
- }
-
- fn start_command_sender(
- &mut self,
- _system: SharedSystem,
- config: &crate::configs::server::ServerConfig,
- sender: Sender<CleanPersonalAccessTokensCommand>,
- ) {
- let personal_access_token_cleaner =
-
PersonalAccessTokenCleaner::new(&config.personal_access_token.cleaner, sender);
- personal_access_token_cleaner.start();
- }
- fn start_command_consumer(
- mut self,
- system: SharedSystem,
- _config: &crate::configs::server::ServerConfig,
- receiver: flume::Receiver<CleanPersonalAccessTokensCommand>,
- ) {
- tokio::spawn(async move {
- let system = system.clone();
- while let Ok(command) = receiver.recv_async().await {
- self.execute(&system, command).await;
- }
- info!("Personal access token cleaner receiver stopped.");
- });
+ info!(
+ "Deleted {} expired personal access tokens.",
+ deleted_tokens_count
+ );
}
-}
-
-*/
+}
\ No newline at end of file
diff --git a/core/server/src/channels/commands/mod.rs
b/core/server/src/channels/commands/mod.rs
index 13b1bf91..0130170b 100644
--- a/core/server/src/channels/commands/mod.rs
+++ b/core/server/src/channels/commands/mod.rs
@@ -16,7 +16,6 @@
* under the License.
*/
-pub mod archive_state;
pub mod clean_personal_access_tokens;
pub mod print_sysinfo;
pub mod save_messages;
diff --git a/core/server/src/channels/commands/print_sysinfo.rs
b/core/server/src/channels/commands/print_sysinfo.rs
index c0f2b1a2..05f68372 100644
--- a/core/server/src/channels/commands/print_sysinfo.rs
+++ b/core/server/src/channels/commands/print_sysinfo.rs
@@ -16,63 +16,33 @@
* under the License.
*/
-// TODO: Fixme
-/*
-use crate::{
- channels::server_command::BackgroundServerCommand,
- configs::server::ServerConfig,
- streaming::{systems::system::SharedSystem, utils::memory_pool},
-};
-use flume::{Receiver, Sender};
-use human_repr::HumanCount;
-use iggy_common::IggyDuration;
-use tokio::time::{self};
-use tracing::{error, info, warn};
-
-#[derive(Debug, Default, Clone)]
-pub struct SysInfoPrintCommand;
-
-pub struct SysInfoPrinter {
- interval: IggyDuration,
- sender: Sender<SysInfoPrintCommand>,
-}
-
-pub struct SysInfoPrintExecutor;
+use std::rc::Rc;
-impl SysInfoPrinter {
- pub fn new(interval: IggyDuration, sender: Sender<SysInfoPrintCommand>) ->
Self {
- Self { interval, sender }
- }
+use crate::shard::IggyShard;
+use crate::streaming::utils::memory_pool;
+use human_repr::HumanCount;
+use iggy_common::IggyError;
+use tracing::{error, info, trace};
- pub fn start(&self) {
- let interval = self.interval;
- let sender = self.sender.clone();
- if interval.is_zero() {
- info!("SysInfoPrinter is disabled.");
- return;
- }
+pub async fn print_sys_info(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+ let config = &shard.config.system.logging;
+ let interval = config.sysinfo_print_interval;
- info!("SysInfoPrinter is enabled, system information will be printed
every {interval}.");
- tokio::spawn(async move {
- let mut interval_timer = time::interval(interval.get_duration());
- loop {
- interval_timer.tick().await;
- let command = SysInfoPrintCommand {};
- sender.send(command).unwrap_or_else(|e| {
- error!("Failed to send SysInfoPrintCommand. Error: {e}");
- });
- }
- });
+ if interval.is_zero() {
+ info!("SysInfoPrinter is disabled.");
+ return Ok(());
}
-}
+ info!("SysInfoPrinter is enabled, system information will be printed every
{interval}.");
+ let mut interval_timer = compio::time::interval(interval.get_duration());
+ loop {
+ interval_timer.tick().await;
+ trace!("Printing system information...");
-impl BackgroundServerCommand<SysInfoPrintCommand> for SysInfoPrintExecutor {
- async fn execute(&mut self, system: &SharedSystem, _command:
SysInfoPrintCommand) {
- let stats = match system.read().await.get_stats().await {
+ let stats = match shard.get_stats().await {
Ok(stats) => stats,
Err(e) => {
error!("Failed to get system information. Error: {e}");
- return;
+ continue;
}
};
@@ -97,31 +67,4 @@ impl BackgroundServerCommand<SysInfoPrintCommand> for
SysInfoPrintExecutor {
memory_pool().log_stats();
}
-
- fn start_command_sender(
- &mut self,
- _system: SharedSystem,
- config: &ServerConfig,
- sender: Sender<SysInfoPrintCommand>,
- ) {
- let printer =
SysInfoPrinter::new(config.system.logging.sysinfo_print_interval, sender);
- printer.start();
- }
-
- fn start_command_consumer(
- mut self,
- system: SharedSystem,
- _config: &ServerConfig,
- receiver: Receiver<SysInfoPrintCommand>,
- ) {
- tokio::spawn(async move {
- let system = system.clone();
- while let Ok(command) = receiver.recv_async().await {
- self.execute(&system, command).await;
- }
- warn!("Sysinfo printer stopped receiving commands.");
- });
- }
-}
-
-*/
+}
\ No newline at end of file
diff --git a/core/server/src/channels/commands/save_messages.rs
b/core/server/src/channels/commands/save_messages.rs
index bf832762..51731042 100644
--- a/core/server/src/channels/commands/save_messages.rs
+++ b/core/server/src/channels/commands/save_messages.rs
@@ -16,107 +16,67 @@
* under the License.
*/
-//Todo: Fixme
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::MessageSaverConfig;
-use crate::configs::server::ServerConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::{Receiver, Sender};
-use iggy_common::IggyDuration;
-use tokio::time;
-use tracing::{error, info, instrument, warn};
+use crate::{shard::IggyShard, shard_info};
+use iggy_common::{Identifier, IggyError};
+use std::rc::Rc;
+use tracing::{error, info, trace};
-pub struct MessagesSaver {
- enabled: bool,
- enforce_fsync: bool,
- interval: IggyDuration,
- sender: Sender<SaveMessagesCommand>,
-}
+pub async fn save_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+ let config = &shard.config.message_saver;
+ if !config.enabled {
+ info!("Message saver is disabled.");
+ return Ok(());
+ }
-#[derive(Debug, Default, Clone)]
-pub struct SaveMessagesCommand {
- pub enforce_fsync: bool,
-}
+ // TODO: Maybe we should get rid of it in order to not complicate, and use
the fsync settings per partition from config.
+ let enforce_fsync = config.enforce_fsync;
+ let interval = config.interval;
+ info!(
+ "Message saver is enabled, buffered messages will be automatically
saved every: {interval}, enforce fsync: {enforce_fsync}."
+ );
-#[derive(Debug, Default, Clone)]
-pub struct SaveMessagesExecutor;
+ let mut interval_timer = compio::time::interval(interval.get_duration());
+ loop {
+ interval_timer.tick().await;
+ trace!("Saving buffered messages...");
-impl MessagesSaver {
- pub fn new(config: &MessageSaverConfig, sender:
Sender<SaveMessagesCommand>) -> Self {
- Self {
- enabled: config.enabled,
- enforce_fsync: config.enforce_fsync,
- interval: config.interval,
- sender,
- }
- }
+ let namespaces = shard.get_current_shard_namespaces();
+ let mut total_saved_messages = 0u32;
+ let reason = "background saver triggered".to_string();
- pub fn start(&self) {
- if !self.enabled {
- info!("Message saver is disabled.");
- return;
- }
+ for ns in namespaces {
+ let stream_id = Identifier::numeric(ns.stream_id() as
u32).unwrap();
+ let topic_id = Identifier::numeric(ns.topic_id() as u32).unwrap();
+ let partition_id = ns.partition_id();
- let enforce_fsync = self.enforce_fsync;
- let interval = self.interval;
- let sender = self.sender.clone();
- info!(
- "Message saver is enabled, buffered messages will be automatically
saved every: {interval}, enforce fsync: {enforce_fsync}."
- );
- tokio::spawn(async move {
- let mut interval_timer = time::interval(interval.get_duration());
- loop {
- interval_timer.tick().await;
- let command = SaveMessagesCommand { enforce_fsync };
- sender.send(command).unwrap_or_else(|e| {
- error!("Failed to send SaveMessagesCommand. Error: {e}",);
- });
- }
- });
- }
-}
-
-impl BackgroundServerCommand<SaveMessagesCommand> for SaveMessagesExecutor {
- #[instrument(skip_all, name = "trace_save_messages")]
- async fn execute(&mut self, system: &SharedSystem, _command:
SaveMessagesCommand) {
- let saved_messages_count =
system.read().await.persist_messages().await;
- match saved_messages_count {
- Ok(n) => {
- if n > 0 {
- info!("Saved {n} buffered messages on disk.");
+ match shard
+ .streams2
+ .persist_messages(
+ shard.id,
+ &stream_id,
+ &topic_id,
+ partition_id,
+ reason.clone(),
+ &shard.config.system,
+ )
+ .await
+ {
+ Ok(batch_count) => {
+ total_saved_messages += batch_count;
+ }
+ Err(err) => {
+ error!(
+ "Failed to save messages for partition {}: {}",
+ partition_id, err
+ );
}
- }
- Err(e) => {
- error!("Couldn't save buffered messages on disk. Error: {e}");
}
}
- }
- fn start_command_sender(
- &mut self,
- _system: SharedSystem,
- config: &ServerConfig,
- sender: Sender<SaveMessagesCommand>,
- ) {
- let messages_saver = MessagesSaver::new(&config.message_saver, sender);
- messages_saver.start();
- }
+ if total_saved_messages > 0 {
+ shard_info!(shard.id, "Saved {} buffered messages on disk.",
total_saved_messages);
+ }
- fn start_command_consumer(
- mut self,
- system: SharedSystem,
- _config: &ServerConfig,
- receiver: Receiver<SaveMessagesCommand>,
- ) {
- tokio::spawn(async move {
- let system = system.clone();
- while let Ok(command) = receiver.recv_async().await {
- self.execute(&system, command).await;
- }
- warn!("Server command handler stopped receiving commands.");
- });
+ trace!("Finished saving buffered messages.");
}
}
-
-*/
diff --git a/core/server/src/channels/commands/verify_heartbeats.rs
b/core/server/src/channels/commands/verify_heartbeats.rs
index 74eacabe..123c174f 100644
--- a/core/server/src/channels/commands/verify_heartbeats.rs
+++ b/core/server/src/channels/commands/verify_heartbeats.rs
@@ -16,88 +16,41 @@
* under the License.
*/
-//TODO: Fixme
-/*
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::HeartbeatConfig;
-use crate::streaming::systems::system::SharedSystem;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyTimestamp;
-use iggy_common::locking::IggyRwLockFn;
-use tokio::time;
-use tracing::{debug, error, info, instrument, warn};
+use crate::shard::IggyShard;
+use iggy_common::{IggyDuration, IggyError, IggyTimestamp};
+use std::rc::Rc;
+use tracing::{debug, info, trace, warn};
const MAX_THRESHOLD: f64 = 1.2;
-pub struct VerifyHeartbeats {
- enabled: bool,
- interval: IggyDuration,
- sender: Sender<VerifyHeartbeatsCommand>,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct VerifyHeartbeatsCommand {
- interval: IggyDuration,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct VerifyHeartbeatsExecutor;
-
-impl VerifyHeartbeats {
- pub fn new(config: &HeartbeatConfig, sender:
Sender<VerifyHeartbeatsCommand>) -> Self {
- Self {
- enabled: config.enabled,
- interval: config.interval,
- sender,
- }
+pub async fn verify_heartbeats(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+ let config = &shard.config.heartbeat;
+ if !config.enabled {
+ info!("Heartbeats verification is disabled.");
+ return Ok(());
}
- pub fn start(&self) {
- if !self.enabled {
- info!("Heartbeats verification is disabled.");
- return;
- }
+ let interval = config.interval;
+ let max_interval = IggyDuration::from((MAX_THRESHOLD *
interval.as_micros() as f64) as u64);
+ info!("Heartbeats will be verified every: {interval}. Max allowed
interval: {max_interval}.");
- let interval = self.interval;
- let max_interval = IggyDuration::from((MAX_THRESHOLD *
interval.as_micros() as f64) as u64);
- let sender = self.sender.clone();
- info!(
- "Heartbeats will be verified every: {interval}. Max allowed
interval: {max_interval}."
- );
- tokio::spawn(async move {
- let mut interval_timer = time::interval(interval.get_duration());
- loop {
- interval_timer.tick().await;
- debug!("Verifying heartbeats...");
- sender
- .send(VerifyHeartbeatsCommand {
- interval: max_interval,
- })
- .unwrap_or_else(|error| {
- error!("Failed to send VerifyHeartbeats. Error: {}",
error);
- });
- }
- });
- }
-}
+ let mut interval_timer = compio::time::interval(interval.get_duration());
-impl BackgroundServerCommand<VerifyHeartbeatsCommand> for
VerifyHeartbeatsExecutor {
- #[instrument(skip_all, name = "trace_verify_heartbeats")]
- async fn execute(&mut self, system: &SharedSystem, command:
VerifyHeartbeatsCommand) {
- let system = system.read().await;
- let clients;
- {
- let client_manager = system.client_manager.read().await;
- clients = client_manager.get_clients();
- }
+ loop {
+ interval_timer.tick().await;
+ trace!("Verifying heartbeats...");
+
+ let clients = {
+ let client_manager = shard.client_manager.borrow();
+ client_manager.get_clients()
+ };
let now = IggyTimestamp::now();
- let heartbeat_to = IggyTimestamp::from(now.as_micros() -
command.interval.as_micros());
+ let heartbeat_to = IggyTimestamp::from(now.as_micros() -
max_interval.as_micros());
debug!("Verifying heartbeats at: {now}, max allowed timestamp:
{heartbeat_to}");
+
let mut stale_clients = Vec::new();
for client in clients {
- let client = client.read().await;
if client.last_heartbeat.as_micros() < heartbeat_to.as_micros() {
warn!(
"Stale client session: {}, last heartbeat at: {}, max
allowed timestamp: {heartbeat_to}",
@@ -114,40 +67,14 @@ impl BackgroundServerCommand<VerifyHeartbeatsCommand> for
VerifyHeartbeatsExecut
}
if stale_clients.is_empty() {
- return;
+ continue;
}
let count = stale_clients.len();
info!("Removing {count} stale clients...");
for client_id in stale_clients {
- system.delete_client(client_id).await;
+ shard.delete_client(client_id);
}
info!("Removed {count} stale clients.");
}
-
- fn start_command_sender(
- &mut self,
- _system: SharedSystem,
- config: &crate::configs::server::ServerConfig,
- sender: Sender<VerifyHeartbeatsCommand>,
- ) {
- let verify_heartbeats = VerifyHeartbeats::new(&config.heartbeat,
sender);
- verify_heartbeats.start();
- }
-
- fn start_command_consumer(
- mut self,
- system: SharedSystem,
- _config: &crate::configs::server::ServerConfig,
- receiver: flume::Receiver<VerifyHeartbeatsCommand>,
- ) {
- tokio::spawn(async move {
- let system = system.clone();
- while let Ok(command) = receiver.recv_async().await {
- self.execute(&system, command).await;
- }
- info!("Heartbeats verifier receiver stopped.");
- });
- }
}
-*/
diff --git a/core/server/src/channels/handler.rs
b/core/server/src/channels/handler.rs
deleted file mode 100644
index ee44fd58..00000000
--- a/core/server/src/channels/handler.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-use std::rc::Rc;
-
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-use super::server_command::BackgroundServerCommand;
-use crate::{configs::server::ServerConfig, shard::IggyShard};
-
-pub struct BackgroundServerCommandHandler<'a> {
- shard: Rc<IggyShard>,
- config: &'a ServerConfig,
-}
-
-impl<'a> BackgroundServerCommandHandler<'a> {
- pub fn new(shard: Rc<IggyShard>, config: &'a ServerConfig) -> Self {
- Self { shard, config }
- }
-
- pub fn install_handler<C, E>(&mut self, mut executor: E) -> Self
- where
- E: BackgroundServerCommand<C> + Send + Sync + 'static,
- {
- let (sender, receiver) = flume::unbounded();
- let shard = self.shard.clone();
- executor.start_command_sender(shard.clone(), self.config, sender);
- executor.start_command_consumer(shard.clone(), self.config, receiver);
- Self {
- shard,
- config: self.config,
- }
- }
-}
diff --git a/core/server/src/channels/mod.rs b/core/server/src/channels/mod.rs
index c3da041b..d147f7e0 100644
--- a/core/server/src/channels/mod.rs
+++ b/core/server/src/channels/mod.rs
@@ -17,5 +17,3 @@
*/
pub mod commands;
-pub mod handler;
-pub mod server_command;
diff --git a/core/server/src/channels/server_command.rs
b/core/server/src/channels/server_command.rs
deleted file mode 100644
index ac490737..00000000
--- a/core/server/src/channels/server_command.rs
+++ /dev/null
@@ -1,39 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::{configs::server::ServerConfig, shard::IggyShard};
-use flume::{Receiver, Sender};
-use std::{future::Future, rc::Rc};
-
-pub trait BackgroundServerCommand<C> {
- fn execute(&mut self, system: &IggyShard, command: C) -> impl
Future<Output = ()>;
-
- fn start_command_sender(
- &mut self,
- shard: Rc<IggyShard>,
- config: &ServerConfig,
- sender: Sender<C>,
- );
-
- fn start_command_consumer(
- self,
- shard: Rc<IggyShard>,
- config: &ServerConfig,
- receiver: Receiver<C>,
- );
-}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 02ed206a..86d9576d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -50,7 +50,9 @@ use server::server_error::{ConfigError, ServerError};
use server::shard::namespace::IggyNamespace;
use server::shard::system::info::SystemInfo;
use server::shard::{IggyShard, ShardInfo, calculate_shard_assignment};
-use server::slab::traits_ext::{EntityComponentSystem,
EntityComponentSystemMutCell, IntoComponents};
+use server::slab::traits_ext::{
+ EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents,
+};
use server::state::StateKind;
use server::state::command::EntryCommand;
use server::state::file::FileState;
@@ -270,7 +272,7 @@ async fn main() -> Result<(), ServerError> {
));
// Ergh... I knew this will backfire to include `Log` as part of the
`Partition` entity,
- // We have to initialize with an default log with every partition,
once we `Clone` the Streams / Topics / Partitions,
+ // We have to initialize with a default log for every partition, once
we `Clone` the Streams / Topics / Partitions,
// because `Clone` impl for `Partition` does not clone the actual log,
just creates an empty one.
streams.with_components(|components| {
let (root, ..) = components.into_components();
@@ -356,7 +358,6 @@ async fn main() -> Result<(), ServerError> {
#[cfg(not(feature = "disable-mimalloc"))]
info!("Using mimalloc allocator");
-
let system = SharedSystem::new(System::new(
config.system.clone(),
config.cluster.clone(),
@@ -365,28 +366,7 @@ async fn main() -> Result<(), ServerError> {
));
*/
- // Workaround to ensure that the statistics are initialized before the
server
- // loads streams and starts accepting connections. This is necessary to
- // have the correct statistics when the server starts.
- //system.write().await.get_stats().await?;
- //system.write().await.init().await?;
-
/*
- let _command_handler = BackgroundServerCommandHandler::new(system.clone(),
&config)
- .install_handler(SaveMessagesExecutor)
- .install_handler(MaintainMessagesExecutor)
- .install_handler(CleanPersonalAccessTokensExecutor)
- .install_handler(SysInfoPrintExecutor)
- .install_handler(VerifyHeartbeatsExecutor);
-
- #[cfg(unix)]
- let (mut ctrl_c, mut sigterm) = {
- use tokio::signal::unix::{SignalKind, signal};
- (
- signal(SignalKind::interrupt())?,
- signal(SignalKind::terminate())?,
- )
- };
let mut current_config = config.clone();
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index f9dfa471..a2d00eda 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -118,12 +118,12 @@ async fn accept_stream(
match connection.accept_bi().await {
Err(compio_quic::ConnectionError::ApplicationClosed { .. }) => {
info!("Connection closed");
- shard.delete_client(client_id).await;
+ shard.delete_client(client_id);
Ok(None)
}
Err(error) => {
error!("Error when handling QUIC stream: {:?}", error);
- shard.delete_client(client_id).await;
+ shard.delete_client(client_id);
Err(error.into())
}
Ok(stream) => Ok(Some(stream)),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index e64cafb6..84b038ca 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -153,7 +153,7 @@ pub struct IggyShard {
// Heart transplant of the old streams structure.
pub(crate) streams2: Streams,
- shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>>,
+ pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>>,
// TODO: Refactor.
pub(crate) storage: Rc<SystemStorage>,
pub(crate) state: StateKind,
@@ -274,6 +274,25 @@ impl IggyShard {
)));
}
+ tasks.push(Box::pin(
+
crate::channels::commands::clean_personal_access_tokens::clear_personal_access_tokens(
+ self.clone(),
+ ),
+ ));
+ // TOOD: Fixme, not always id 0 is the first shard.
+ if self.id == 0 {
+ tasks.push(Box::pin(
+
crate::channels::commands::print_sysinfo::print_sys_info(self.clone()),
+ ));
+ }
+
+ tasks.push(Box::pin(
+
crate::channels::commands::verify_heartbeats::verify_heartbeats(self.clone()),
+ ));
+ tasks.push(Box::pin(
+
crate::channels::commands::save_messages::save_messages(self.clone()),
+ ));
+
let stop_receiver = self.get_stop_receiver();
let shard_for_shutdown = self.clone();
@@ -300,7 +319,6 @@ impl IggyShard {
async fn load_segments(&self) -> Result<(), IggyError> {
use crate::bootstrap::load_segments;
- use crate::shard::namespace::IggyNamespace;
for shard_entry in self.shards_table.iter() {
let (namespace, shard_info) = shard_entry.pair();
@@ -342,7 +360,7 @@ impl IggyShard {
&Identifier::numeric(stream_id as u32).unwrap(),
&Identifier::numeric(topic_id as u32).unwrap(),
partition_id,
- |(_,_,_ , offset, .., mut log)| {
+ |(_, _, _, offset, .., log)| {
*log = loaded_log;
let current_offset =
log.active_segment().end_offset;
offset.store(current_offset,
Ordering::Relaxed);
@@ -992,6 +1010,20 @@ impl IggyShard {
self.shards_table.insert(ns, shard_info);
}
+ pub fn get_current_shard_namespaces(&self) -> Vec<IggyNamespace> {
+ self.shards_table
+ .iter()
+ .filter_map(|entry| {
+ let (ns, shard_info) = entry.pair();
+ if shard_info.id == self.id {
+ Some(*ns)
+ } else {
+ None
+ }
+ })
+ .collect()
+ }
+
pub fn insert_shard_table_records(
&self,
records: impl IntoIterator<Item = (IggyNamespace, ShardInfo)>,
diff --git a/core/server/src/shard/system/clients.rs
b/core/server/src/shard/system/clients.rs
index c4543148..ea225378 100644
--- a/core/server/src/shard/system/clients.rs
+++ b/core/server/src/shard/system/clients.rs
@@ -36,7 +36,7 @@ impl IggyShard {
session
}
- pub async fn delete_client(&self, client_id: u32) {
+ pub fn delete_client(&self, client_id: u32) {
let consumer_groups: Vec<(u32, u32, u32)>;
{
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 549b8d82..02ab2850 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -214,18 +214,22 @@ impl MainOps for Streams {
// Try committing the journal
if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded {
- self.persist_messages(
- shard_id,
+ let reason = self.with_partition_by_id(
stream_id,
topic_id,
partition_id,
- unsaved_messages_count_exceeded,
- unsaved_messages_size_exceeded,
- journal_messages_count,
- journal_size,
- config,
- )
- .await?;
+ streaming_partitions::helpers::persist_reason(
+ unsaved_messages_count_exceeded,
+ unsaved_messages_size_exceeded,
+ journal_messages_count,
+ journal_size,
+ config,
+ ),
+ );
+
+ let _batch_count = self
+ .persist_messages(shard_id, stream_id, topic_id, partition_id,
reason, config)
+ .await?;
if is_full {
self.handle_full_segment(shard_id, stream_id, topic_id,
partition_id, config)
@@ -778,12 +782,16 @@ impl Streams {
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: usize,
- unsaved_messages_count_exceeded: bool,
- unsaved_messages_size_exceeded: bool,
- journal_messages_count: u32,
- journal_size: u32,
+ reason: String,
config: &SystemConfig,
- ) -> Result<(), IggyError> {
+ ) -> Result<u32, IggyError> {
+ let is_empty = self.with_partition_by_id(stream_id, topic_id,
partition_id, |(.., log)| {
+ log.journal().is_empty()
+ });
+ if is_empty {
+ return Ok(0);
+ }
+
let batches = self.with_partition_by_id_mut(
stream_id,
topic_id,
@@ -791,18 +799,6 @@ impl Streams {
streaming_partitions::helpers::commit_journal(),
);
- let reason = self.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- streaming_partitions::helpers::persist_reason(
- unsaved_messages_count_exceeded,
- unsaved_messages_size_exceeded,
- journal_messages_count,
- journal_size,
- config,
- ),
- );
let (saved, batch_count) = self
.with_partition_by_id_async(
stream_id,
@@ -830,6 +826,6 @@ impl Streams {
),
);
- Ok(())
+ Ok(batch_count)
}
}
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index c16020d8..f9baf62e 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -295,7 +295,6 @@ pub async fn get_messages_by_offset(
} else {
0
};
- let actual_first_offset_journal = journal.get(|batches|
batches.first_offset()).unwrap();
let mut combined_batch_set = IggyMessagesBatchSet::empty();
// Load messages from disk if needed
@@ -362,9 +361,6 @@ async fn load_messages_from_disk_by_offset(
}
let indexes_to_read = indexes_to_read.unwrap();
- let first = indexes_to_read.get(0).unwrap();
- let last = indexes_to_read.last().unwrap();
-
let batch = storage
.messages_reader
.as_ref()
@@ -774,7 +770,6 @@ pub fn persist_batch(
)
})?;
- let indices = log.active_indexes().unwrap();
let unsaved_indexes_slice =
log.active_indexes().unwrap().unsaved_slice();
let len = unsaved_indexes_slice.len();
storage