This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new f21d85f4 feat(io_uring): fix rest of lints and tests (#2242)
f21d85f4 is described below

commit f21d85f45fc6dd54b31f66c64c3201a24ebaa92d
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 6 17:44:06 2025 +0200

    feat(io_uring): fix rest of lints and tests (#2242)
    
    Co-authored-by: numminex <[email protected]>
---
 .../tests/cli/stream/test_stream_purge_command.rs  |  2 --
 .../tests/cli/topic/test_topic_create_command.rs   |  7 ----
 .../tests/cli/topic/test_topic_list_command.rs     |  6 ----
 core/integration/tests/streaming/mod.rs            |  2 +-
 core/server/src/binary/sender.rs                   |  4 +--
 core/server/src/http/http_shard_wrapper.rs         |  1 +
 core/server/src/http/partitions.rs                 | 40 +---------------------
 core/server/src/shard/mod.rs                       |  2 +-
 core/server/src/shard/system/messages.rs           |  1 +
 core/server/src/shard/system/partitions.rs         |  6 +---
 core/server/src/shard/system/streams.rs            |  2 +-
 core/server/src/shard/system/topics.rs             |  6 +---
 core/server/src/shard/system/users.rs              |  5 +--
 core/server/src/shard/transmission/event.rs        |  1 +
 core/server/src/shard/transmission/message.rs      |  2 ++
 core/server/src/slab/streams.rs                    |  5 +--
 core/server/src/state/mod.rs                       |  3 ++
 core/server/src/streaming/partitions/partition2.rs |  2 +-
 core/server/src/streaming/partitions/storage2.rs   |  9 ++---
 core/server/src/streaming/topics/helpers.rs        |  2 +-
 core/server/src/streaming/topics/storage2.rs       |  9 +----
 21 files changed, 25 insertions(+), 92 deletions(-)

diff --git a/core/integration/tests/cli/stream/test_stream_purge_command.rs 
b/core/integration/tests/cli/stream/test_stream_purge_command.rs
index 78f897f2..571f5add 100644
--- a/core/integration/tests/cli/stream/test_stream_purge_command.rs
+++ b/core/integration/tests/cli/stream/test_stream_purge_command.rs
@@ -31,7 +31,6 @@ struct TestStreamPurgeCmd {
     stream_id: u32,
     stream_name: String,
     using_identifier: TestStreamId,
-    topic_id: u32,
     topic_name: String,
 }
 
@@ -41,7 +40,6 @@ impl TestStreamPurgeCmd {
             stream_id,
             stream_name: name,
             using_identifier,
-            topic_id: 0,
             topic_name: String::from("test_topic"),
         }
     }
diff --git a/core/integration/tests/cli/topic/test_topic_create_command.rs 
b/core/integration/tests/cli/topic/test_topic_create_command.rs
index 96e100d4..cb1718f9 100644
--- a/core/integration/tests/cli/topic/test_topic_create_command.rs
+++ b/core/integration/tests/cli/topic/test_topic_create_command.rs
@@ -36,7 +36,6 @@ use std::time::Duration;
 struct TestTopicCreateCmd {
     stream_id: u32,
     stream_name: String,
-    topic_id: Option<u32>,
     topic_name: String,
     partitions_count: u32,
     compression_algorithm: CompressionAlgorithm,
@@ -51,7 +50,6 @@ impl TestTopicCreateCmd {
     fn new(
         stream_id: u32,
         stream_name: String,
-        topic_id: Option<u32>,
         topic_name: String,
         partitions_count: u32,
         compression_algorithm: CompressionAlgorithm,
@@ -63,7 +61,6 @@ impl TestTopicCreateCmd {
         Self {
             stream_id,
             stream_name,
-            topic_id,
             topic_name,
             partitions_count,
             compression_algorithm,
@@ -188,7 +185,6 @@ pub async fn should_be_successful() {
         .execute_test(TestTopicCreateCmd::new(
             0,
             String::from("main"),
-            None,
             String::from("sync"),
             1,
             Default::default(),
@@ -202,7 +198,6 @@ pub async fn should_be_successful() {
         .execute_test(TestTopicCreateCmd::new(
             1,
             String::from("testing"),
-            None,
             String::from("topic"),
             5,
             Default::default(),
@@ -216,7 +211,6 @@ pub async fn should_be_successful() {
         .execute_test(TestTopicCreateCmd::new(
             2,
             String::from("prod"),
-            None,
             String::from("named"),
             1,
             Default::default(),
@@ -230,7 +224,6 @@ pub async fn should_be_successful() {
         .execute_test(TestTopicCreateCmd::new(
             3,
             String::from("big"),
-            None,
             String::from("probe"),
             2,
             Default::default(),
diff --git a/core/integration/tests/cli/topic/test_topic_list_command.rs 
b/core/integration/tests/cli/topic/test_topic_list_command.rs
index 0f394677..4f30123e 100644
--- a/core/integration/tests/cli/topic/test_topic_list_command.rs
+++ b/core/integration/tests/cli/topic/test_topic_list_command.rs
@@ -31,7 +31,6 @@ use serial_test::parallel;
 struct TestTopicListCmd {
     stream_id: u32,
     stream_name: String,
-    topic_id: u32,
     topic_name: String,
     using_stream_id: TestStreamId,
     output: OutputFormat,
@@ -41,7 +40,6 @@ impl TestTopicListCmd {
     fn new(
         stream_id: u32,
         stream_name: String,
-        topic_id: u32,
         topic_name: String,
         using_stream_id: TestStreamId,
         output: OutputFormat,
@@ -49,7 +47,6 @@ impl TestTopicListCmd {
         Self {
             stream_id,
             stream_name,
-            topic_id,
             topic_name,
             using_stream_id,
             output,
@@ -137,7 +134,6 @@ pub async fn should_be_successful() {
         .execute_test(TestTopicListCmd::new(
             0,
             String::from("main"),
-            0,
             String::from("sync"),
             TestStreamId::Named,
             OutputFormat::Default,
@@ -147,7 +143,6 @@ pub async fn should_be_successful() {
         .execute_test(TestTopicListCmd::new(
             1,
             String::from("customer"),
-            0,
             String::from("topic"),
             TestStreamId::Named,
             OutputFormat::List,
@@ -157,7 +152,6 @@ pub async fn should_be_successful() {
         .execute_test(TestTopicListCmd::new(
             2,
             String::from("production"),
-            0,
             String::from("data"),
             TestStreamId::Named,
             OutputFormat::Table,
diff --git a/core/integration/tests/streaming/mod.rs 
b/core/integration/tests/streaming/mod.rs
index d6f55a84..6783bbbf 100644
--- a/core/integration/tests/streaming/mod.rs
+++ b/core/integration/tests/streaming/mod.rs
@@ -129,4 +129,4 @@ async fn bootstrap_test_environment(
         partition_id: 0,
         task_registry,
     })
-}
\ No newline at end of file
+}
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 502602d3..2980726e 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -16,8 +16,6 @@
  * under the License.
  */
 
-use std::future::Future;
-
 use crate::streaming::utils::PooledBuffer;
 use crate::tcp::tcp_sender::TcpSender;
 use crate::tcp::tcp_tls_sender::TcpTlsSender;
@@ -27,6 +25,7 @@ use compio::net::TcpStream;
 use compio_quic::{RecvStream, SendStream};
 use compio_tls::TlsStream;
 use iggy_common::IggyError;
+use std::future::Future;
 
 macro_rules! forward_async_methods {
     (
@@ -68,6 +67,7 @@ pub trait Sender {
     fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>;
 }
 
+#[allow(clippy::large_enum_variant)]
 pub enum SenderKind {
     Tcp(TcpSender),
     TcpTls(TcpTlsSender),
diff --git a/core/server/src/http/http_shard_wrapper.rs 
b/core/server/src/http/http_shard_wrapper.rs
index bafe4f60..62ba1a24 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -250,6 +250,7 @@ impl HttpSafeShard {
         self.shard().get_stats().await
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub async fn poll_messages(
         &self,
         client_id: u32,
diff --git a/core/server/src/http/partitions.rs 
b/core/server/src/http/partitions.rs
index 70b21ed5..123af4fb 100644
--- a/core/server/src/http/partitions.rs
+++ b/core/server/src/http/partitions.rs
@@ -23,7 +23,6 @@ use crate::http::shared::AppState;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
-use crate::streaming::{streams, topics};
 use axum::extract::{Path, Query, State};
 use axum::http::StatusCode;
 use axum::routing::post;
@@ -76,41 +75,6 @@ async fn create_partitions(
             partitions,
         };
         let _responses = shard.broadcast_event_to_all_shards(event).await;
-
-        let numeric_stream_id = shard
-            .streams2
-            .with_stream_by_id(&command.stream_id, 
streams::helpers::get_stream_id());
-        let numeric_topic_id = shard.streams2.with_topic_by_id(
-            &command.stream_id,
-            &command.topic_id,
-            topics::helpers::get_topic_id(),
-        );
-
-        // TODO: Replace with new mechanism
-        /*
-        let records = shard
-            .create_shard_table_records(&partition_ids, numeric_stream_id, 
numeric_topic_id)
-            .collect::<Vec<_>>();
-
-        for (ns, shard_info) in records.iter() {
-            let partition = topic.get_partition(ns.partition_id)?;
-            let mut partition = partition.write().await;
-            partition.persist().await?;
-            if shard_info.id() == shard.id {
-                partition.open().await?;
-            }
-        }
-
-        shard.insert_shard_table_records(records);
-
-        let event = ShardEvent::CreatedShardTableRecords {
-            stream_id: numeric_stream_id,
-            topic_id: numeric_topic_id,
-            partition_ids: partition_ids.clone(),
-        };
-        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
-        */
-
         Ok::<(), CustomError>(())
     });
 
@@ -160,8 +124,6 @@ async fn delete_partitions(
         )
     })?;
 
-    // Broadcast event.
-
     let command = EntryCommand::DeletePartitions(DeletePartitions {
         stream_id: query.stream_id.clone(),
         topic_id: query.topic_id.clone(),
@@ -178,4 +140,4 @@ async fn delete_partitions(
         })?;
 
     Ok(StatusCode::NO_CONTENT)
-}
+}
\ No newline at end of file
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 896c202b..4bebc36c 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -982,7 +982,7 @@ impl IggyShard {
                 }
 
                 Ok(())
-            },
+            }
             ShardEvent::JoinedConsumerGroup {
                 client_id,
                 stream_id,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 437c163e..efef5a1c 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -167,6 +167,7 @@ impl IggyShard {
         Ok(())
     }
 
+    #[allow(clippy::too_many_arguments)]
     pub async fn poll_messages(
         &self,
         client_id: u32,
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 0f742b2c..ca633cbe 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -25,8 +25,6 @@ use crate::shard_info;
 use crate::slab::traits_ext::EntityMarker;
 use crate::slab::traits_ext::IntoComponents;
 use crate::streaming::partitions;
-use crate::streaming::partitions::journal::MemoryMessageJournal;
-use crate::streaming::partitions::log::SegmentedLog;
 use crate::streaming::partitions::partition2;
 use crate::streaming::partitions::storage2::create_partition_file_hierarchy;
 use crate::streaming::partitions::storage2::delete_partitions_from_disk;
@@ -35,7 +33,6 @@ use 
crate::streaming::segments::storage::create_segment_storage;
 use crate::streaming::session::Session;
 use crate::streaming::streams;
 use crate::streaming::topics;
-
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
@@ -105,7 +102,6 @@ impl IggyShard {
 
         let shards_count = self.get_available_shards_count();
         for (partition_id, stats) in partitions.iter().map(|p| (p.id(), 
p.stats())) {
-            // TODO: Create shard table recordsj.
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
             let shard_id = calculate_shard_assignment(&ns, shards_count);
             let shard_info = ShardInfo::new(shard_id);
@@ -266,7 +262,7 @@ impl IggyShard {
         let mut total_size_bytes = 0;
 
         for partition in partitions {
-            let (root, stats, _, _, _, _, mut log) = 
partition.into_components();
+            let (root, stats, _, _, _, _, _) = partition.into_components();
             let partition_id = root.id();
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
             self.remove_shard_table_record(&ns);
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index b2679f07..a0d855c4 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -108,7 +108,7 @@ impl IggyShard {
     }
 
     pub fn delete_stream2_bypass_auth(&self, id: &Identifier) -> 
stream2::Stream {
-       self.delete_stream2_base(id)
+        self.delete_stream2_base(id)
     }
 
     fn delete_stream2_base(&self, id: &Identifier) -> stream2::Stream {
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index fbf583cd..ed8c348d 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -21,16 +21,12 @@ use crate::shard::IggyShard;
 use crate::shard_info;
 use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker, InsertCell, 
IntoComponents};
 use crate::streaming::session::Session;
-use crate::streaming::stats::{StreamStats, TopicStats};
 use crate::streaming::topics::storage2::{create_topic_file_hierarchy, 
delete_topic_from_disk};
 use crate::streaming::topics::topic2::{self};
 use crate::streaming::{partitions, streams, topics};
 use error_set::ErrContext;
-use iggy_common::{
-    CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, 
MaxTopicSize,
-};
+use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
 use std::str::FromStr;
-use std::sync::Arc;
 
 impl IggyShard {
     #[allow(clippy::too_many_arguments)]
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index c13d0cbf..aef6754b 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -136,7 +136,7 @@ impl IggyShard {
     ) -> Result<User, IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
-        .borrow()
+            .borrow()
             .create_user(session.get_user_id())
             .with_error_context(|error| {
                 format!(
@@ -521,7 +521,8 @@ impl IggyShard {
     }
 
     fn logout_user_base(&self, user_id: u32, client_id: u32) -> Result<(), 
IggyError> {
-        let user = self
+        // TODO(hubcio): not sure if user is needed here
+        let _user = self
             .get_user(&Identifier::numeric(user_id)?)
             .with_error_context(|error| {
                 format!(
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index a57417e5..79c30edf 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -13,6 +13,7 @@ use iggy_common::{
 };
 use std::net::SocketAddr;
 
+#[allow(clippy::large_enum_variant)]
 #[derive(Debug, Clone)]
 pub enum ShardEvent {
     FlushUnsavedBuffer {
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 14bf075b..bf6537eb 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -25,12 +25,14 @@ use crate::{
 };
 use iggy_common::Identifier;
 
+#[allow(clippy::large_enum_variant)]
 pub enum ShardSendRequestResult {
     // TODO: In the future we can add other variants, for example backpressure 
from the destination shard,
     Recoil(ShardMessage),
     Response(ShardResponse),
 }
 
+#[allow(clippy::large_enum_variant)]
 #[derive(Debug)]
 pub enum ShardMessage {
     Request(ShardRequest),
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 386c7e56..694d925b 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1306,10 +1306,7 @@ impl Streams {
             stream_id,
             topic_id,
             partition_id,
-            streaming_partitions::helpers::update_index_and_increment_stats(
-                saved,
-                config,
-            ),
+            
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
         );
 
         Ok(batch_count)
diff --git a/core/server/src/state/mod.rs b/core/server/src/state/mod.rs
index 6ea022a6..2bf2c6bf 100644
--- a/core/server/src/state/mod.rs
+++ b/core/server/src/state/mod.rs
@@ -32,6 +32,9 @@ pub mod system;
 
 pub const COMPONENT: &str = "STATE";
 
+// TODO(hubcio): I don't like this approach, we should avoid mocking and 
simplify it all.
+
+#[allow(clippy::large_enum_variant)]
 #[derive(Debug)]
 pub enum StateKind {
     File(file::FileState),
diff --git a/core/server/src/streaming/partitions/partition2.rs 
b/core/server/src/streaming/partitions/partition2.rs
index 087d23b0..1b5480a5 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -95,7 +95,7 @@ pub struct Partition {
 }
 
 impl Partition {
-    #[allow(clippy::too_many_arguments)] 
+    #[allow(clippy::too_many_arguments)]
     pub fn new(
         created_at: IggyTimestamp,
         should_increment_offset: bool,
diff --git a/core/server/src/streaming/partitions/storage2.rs 
b/core/server/src/streaming/partitions/storage2.rs
index 63e5cb65..42ff8698 100644
--- a/core/server/src/streaming/partitions/storage2.rs
+++ b/core/server/src/streaming/partitions/storage2.rs
@@ -1,11 +1,7 @@
 use super::COMPONENT;
 use crate::{
-    configs::system::SystemConfig,
-    io::fs_utils::remove_dir_all,
-    shard_error, shard_info, shard_trace,
-    streaming::partitions::{
-        consumer_offset::ConsumerOffset,
-    },
+    configs::system::SystemConfig, io::fs_utils::remove_dir_all, shard_error, 
shard_info,
+    shard_trace, streaming::partitions::consumer_offset::ConsumerOffset,
 };
 use compio::{
     fs::{self, OpenOptions, create_dir_all},
@@ -111,7 +107,6 @@ pub async fn delete_partitions_from_disk(
     partition_id: usize,
     config: &SystemConfig,
 ) -> Result<(), IggyError> {
-
     let partition_path = config.get_partition_path(stream_id, topic_id, 
partition_id);
     remove_dir_all(&partition_path).await.map_err(|_| {
         IggyError::CannotDeletePartitionDirectory(
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 976ffad9..1ddc0454 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -252,4 +252,4 @@ fn mimic_members(members: &Slab<Member>) -> Slab<Member> {
         Member::new(member.client_id).insert_into(&mut container);
     }
     container
-}
\ No newline at end of file
+}
diff --git a/core/server/src/streaming/topics/storage2.rs 
b/core/server/src/streaming/topics/storage2.rs
index b166bc96..c412d1dd 100644
--- a/core/server/src/streaming/topics/storage2.rs
+++ b/core/server/src/streaming/topics/storage2.rs
@@ -68,14 +68,7 @@ pub async fn delete_topic_from_disk(
         let partition = partitions.delete(id);
         let (root, stats, _, _, _, _, _log) = partition.into_components();
         let partition_id = root.id();
-        delete_partitions_from_disk(
-            shard_id,
-            stream_id,
-            topic_id,
-            partition_id,
-            config,
-        )
-        .await?;
+        delete_partitions_from_disk(shard_id, stream_id, topic_id, 
partition_id, config).await?;
         messages_count += stats.messages_count_inconsistent();
         size_bytes += stats.size_bytes_inconsistent();
         segments_count += stats.segments_count_inconsistent();

Reply via email to