This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new b9487b2c fix(server): resolve TODOs and optimize reference usage
(#2248)
b9487b2c is described below
commit b9487b2caf6a7dd0348d91e3b28d02002b0152cd
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Oct 7 11:46:50 2025 +0200
fix(server): resolve TODOs and optimize reference usage (#2248)
- Fix consumer_offsets to use with_partition_by_id directly
- Move calculate_partition_id_by_messages_key_hash to topics helpers
- Refactor purge_stream2 to use purge_stream2_base helper
- Remove unnecessary user lookup in logout_user_base
- Replace Arc with references in snapshot functions
- Document snapshot thread pool limitation
---
core/server/src/shard/system/consumer_offsets.rs | 26 ++++++++++++++-----
core/server/src/shard/system/messages.rs | 33 ++++++------------------
core/server/src/shard/system/snapshot/mod.rs | 20 +++++++++-----
core/server/src/shard/system/streams.rs | 13 +---------
core/server/src/shard/system/users.rs | 14 ++--------
core/server/src/streaming/topics/helpers.rs | 18 +++++++++++++
6 files changed, 61 insertions(+), 63 deletions(-)
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index af5f71bb..47ab62a6 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -193,14 +193,23 @@ impl IggyShard {
partition_id: usize,
offset: u64,
) {
- // TODO: This can use `with_partition_by_id` directly.
+ let stream_id_num = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+ let topic_id_num =
+ self.streams2
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+
match polling_consumer {
PollingConsumer::Consumer(id, _) => {
- self.streams2.with_stream_by_id(
+ self.streams2.with_partition_by_id(
stream_id,
- streams::helpers::store_consumer_offset(
+ topic_id,
+ partition_id,
+ partitions::helpers::store_consumer_offset(
*id,
- topic_id,
+ stream_id_num,
+ topic_id_num,
partition_id,
offset,
&self.config.system,
@@ -208,11 +217,14 @@ impl IggyShard {
);
}
PollingConsumer::ConsumerGroup(_, id) => {
- self.streams2.with_stream_by_id(
+ self.streams2.with_partition_by_id(
stream_id,
- streams::helpers::store_consumer_group_member_offset(
+ topic_id,
+ partition_id,
+ partitions::helpers::store_consumer_group_member_offset(
*id,
- topic_id,
+ stream_id_num,
+ topic_id_num,
partition_id,
offset,
&self.config.system,
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index efef5a1c..70baa14c 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -24,13 +24,12 @@ use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
-use crate::shard_trace;
use crate::streaming::partitions::journal::Journal;
use crate::streaming::polling_consumer::PollingConsumer;
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet};
use crate::streaming::session::Session;
use crate::streaming::traits::MainOps;
-use crate::streaming::utils::{PooledBuffer, hash};
+use crate::streaming::utils::PooledBuffer;
use crate::streaming::{partitions, streams, topics};
use error_set::ErrContext;
use iggy_common::{
@@ -49,24 +48,6 @@ impl IggyShard {
partitioning: &Partitioning,
batch: IggyMessagesBatchMut,
) -> Result<(), IggyError> {
- // TODO: move to helpers.
- fn calculate_partition_id_by_messages_key_hash(
- shard_id: u16,
- upperbound: usize,
- messages_key: &[u8],
- ) -> usize {
- let messages_key_hash = hash::calculate_32(messages_key) as usize;
- let partition_id = messages_key_hash % upperbound;
- shard_trace!(
- shard_id,
- "Calculated partition ID: {} for messages key: {:?}, hash: {}",
- partition_id,
- messages_key,
- messages_key_hash
- );
- partition_id
- }
-
let numeric_stream_id = self
.streams2
.with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
@@ -108,11 +89,13 @@ impl IggyShard {
) as usize),
PartitioningKind::MessagesKey => {
let upperbound = root.partitions().len();
- Ok(calculate_partition_id_by_messages_key_hash(
- self.id,
- upperbound,
- &partitioning.value,
- ))
+ Ok(
+
topics::helpers::calculate_partition_id_by_messages_key_hash(
+ self.id,
+ upperbound,
+ &partitioning.value,
+ ),
+ )
}
},
)?;
diff --git a/core/server/src/shard/system/snapshot/mod.rs
b/core/server/src/shard/system/snapshot/mod.rs
index 9244a0c4..b97b5dcd 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -27,13 +27,19 @@ use compio::fs::OpenOptions;
use compio::io::AsyncWriteAtExt;
use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression,
SystemSnapshotType};
use std::path::PathBuf;
-// TODO: compio has an `process` module, consider using that instead, but read
the docs carefully // https://compio.rs/docs/compio/process
-use std::process::Command;
-use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
use tracing::{error, info};
+// TODO(hubcio): compio has a `process` module, but it currently blocks the
executor when the runtime
+// has thread_pool_limit(0) configured (which we do on non-macOS platforms in
bootstrap.rs).
+// To use compio::process::Command, we need to either:
+// 1. Enable thread pool by removing/increasing thread_pool_limit(0)
+// 2. Use std::process::Command with compio::runtime::spawn_blocking (requires
thread pool)
+// 3. Find alternative approach that doesn't rely on thread pool
+// See: https://compio.rs/docs/compio/process and
bootstrap::create_shard_executor
+use std::process::Command;
+
impl IggyShard {
pub async fn get_snapshot(
&self,
@@ -68,7 +74,7 @@ impl IggyShard {
for snapshot_type in snapshot_types {
info!("Processing snapshot type: {:?}", snapshot_type);
- match get_command_result(snapshot_type,
self.config.system.clone()).await {
+ match get_command_result(snapshot_type, &self.config.system).await
{
Ok(temp_file) => {
info!(
"Got temp file for {:?}: {}",
@@ -215,7 +221,7 @@ async fn get_test_snapshot() -> Result<NamedTempFile,
std::io::Error> {
write_command_output_to_temp_file(Command::new("echo").arg("test")).await
}
-async fn get_server_logs(config: Arc<SystemConfig>) -> Result<NamedTempFile,
std::io::Error> {
+async fn get_server_logs(config: &SystemConfig) -> Result<NamedTempFile,
std::io::Error> {
let base_directory = PathBuf::from(config.get_system_path());
let logs_subdirectory = PathBuf::from(&config.logging.path);
let logs_path = base_directory.join(logs_subdirectory);
@@ -228,7 +234,7 @@ async fn get_server_logs(config: Arc<SystemConfig>) ->
Result<NamedTempFile, std
write_command_output_to_temp_file(Command::new("sh").args(["-c",
&list_and_cat])).await
}
-async fn get_server_config(config: Arc<SystemConfig>) -> Result<NamedTempFile,
std::io::Error> {
+async fn get_server_config(config: &SystemConfig) -> Result<NamedTempFile,
std::io::Error> {
let base_directory = PathBuf::from(config.get_system_path());
let config_path =
base_directory.join("runtime").join("current_config.toml");
@@ -237,7 +243,7 @@ async fn get_server_config(config: Arc<SystemConfig>) ->
Result<NamedTempFile, s
async fn get_command_result(
snapshot_type: &SystemSnapshotType,
- config: Arc<SystemConfig>,
+ config: &SystemConfig,
) -> Result<NamedTempFile, std::io::Error> {
match snapshot_type {
SystemSnapshotType::FilesystemOverview =>
get_filesystem_overview().await,
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index cd497401..6e76b2ca 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -178,18 +178,7 @@ impl IggyShard {
})?;
}
- //TODO: Tech debt.
- let topic_ids = self
- .streams2
- .with_stream_by_id(stream_id, streams::helpers::get_topic_ids());
-
- // Purge each topic in the stream using bypass auth
- for topic_id in topic_ids {
- let topic_identifier = Identifier::numeric(topic_id as
u32).unwrap();
- self.purge_topic2(session, stream_id, &topic_identifier)
- .await?;
- }
- Ok(())
+ self.purge_stream2_base(stream_id).await
}
pub async fn purge_stream2_bypass_auth(&self, stream_id: &Identifier) ->
Result<(), IggyError> {
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index aef6754b..8aa7d963 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -515,21 +515,11 @@ impl IggyShard {
pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let client_id = session.client_id;
- let user_id = session.get_user_id();
- self.logout_user_base(user_id, client_id)?;
+ self.logout_user_base(client_id)?;
Ok(())
}
- fn logout_user_base(&self, user_id: u32, client_id: u32) -> Result<(),
IggyError> {
- // TODO(hubcio): not sure if user is needed here
- let _user = self
- .get_user(&Identifier::numeric(user_id)?)
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to get user with
id: {}",
- user_id,
- )
- })?;
+ fn logout_user_base(&self, client_id: u32) -> Result<(), IggyError> {
if client_id > 0 {
let mut client_manager = self.client_manager.borrow_mut();
client_manager.clear_user_id(client_id)?;
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 1ddc0454..87c29c36 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -16,6 +16,7 @@ use crate::{
},
topic2::{Topic, TopicRef, TopicRefMut, TopicRoot},
},
+ utils::hash,
},
};
use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
@@ -51,6 +52,23 @@ pub fn get_max_topic_size() -> impl
FnOnce(ComponentsById<TopicRef>) -> MaxTopic
|(root, _, _)| root.max_topic_size()
}
+pub fn calculate_partition_id_by_messages_key_hash(
+ shard_id: u16,
+ upperbound: usize,
+ messages_key: &[u8],
+) -> usize {
+ let messages_key_hash = hash::calculate_32(messages_key) as usize;
+ let partition_id = messages_key_hash % upperbound;
+ shard_trace!(
+ shard_id,
+ "Calculated partition ID: {} for messages key: {:?}, hash: {}",
+ partition_id,
+ messages_key,
+ messages_key_hash
+ );
+ partition_id
+}
+
pub fn delete_topic(topic_id: &Identifier) -> impl FnOnce(&Topics) -> Topic {
|container| {
let id = container.get_index(topic_id);