This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new f21d85f4 feat(io_uring): fix rest of lints and tests (#2242)
f21d85f4 is described below
commit f21d85f45fc6dd54b31f66c64c3201a24ebaa92d
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 6 17:44:06 2025 +0200
feat(io_uring): fix rest of lints and tests (#2242)
Co-authored-by: numminex <[email protected]>
---
.../tests/cli/stream/test_stream_purge_command.rs | 2 --
.../tests/cli/topic/test_topic_create_command.rs | 7 ----
.../tests/cli/topic/test_topic_list_command.rs | 6 ----
core/integration/tests/streaming/mod.rs | 2 +-
core/server/src/binary/sender.rs | 4 +--
core/server/src/http/http_shard_wrapper.rs | 1 +
core/server/src/http/partitions.rs | 40 +---------------------
core/server/src/shard/mod.rs | 2 +-
core/server/src/shard/system/messages.rs | 1 +
core/server/src/shard/system/partitions.rs | 6 +---
core/server/src/shard/system/streams.rs | 2 +-
core/server/src/shard/system/topics.rs | 6 +---
core/server/src/shard/system/users.rs | 5 +--
core/server/src/shard/transmission/event.rs | 1 +
core/server/src/shard/transmission/message.rs | 2 ++
core/server/src/slab/streams.rs | 5 +--
core/server/src/state/mod.rs | 3 ++
core/server/src/streaming/partitions/partition2.rs | 2 +-
core/server/src/streaming/partitions/storage2.rs | 9 ++---
core/server/src/streaming/topics/helpers.rs | 2 +-
core/server/src/streaming/topics/storage2.rs | 9 +----
21 files changed, 25 insertions(+), 92 deletions(-)
diff --git a/core/integration/tests/cli/stream/test_stream_purge_command.rs
b/core/integration/tests/cli/stream/test_stream_purge_command.rs
index 78f897f2..571f5add 100644
--- a/core/integration/tests/cli/stream/test_stream_purge_command.rs
+++ b/core/integration/tests/cli/stream/test_stream_purge_command.rs
@@ -31,7 +31,6 @@ struct TestStreamPurgeCmd {
stream_id: u32,
stream_name: String,
using_identifier: TestStreamId,
- topic_id: u32,
topic_name: String,
}
@@ -41,7 +40,6 @@ impl TestStreamPurgeCmd {
stream_id,
stream_name: name,
using_identifier,
- topic_id: 0,
topic_name: String::from("test_topic"),
}
}
diff --git a/core/integration/tests/cli/topic/test_topic_create_command.rs
b/core/integration/tests/cli/topic/test_topic_create_command.rs
index 96e100d4..cb1718f9 100644
--- a/core/integration/tests/cli/topic/test_topic_create_command.rs
+++ b/core/integration/tests/cli/topic/test_topic_create_command.rs
@@ -36,7 +36,6 @@ use std::time::Duration;
struct TestTopicCreateCmd {
stream_id: u32,
stream_name: String,
- topic_id: Option<u32>,
topic_name: String,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
@@ -51,7 +50,6 @@ impl TestTopicCreateCmd {
fn new(
stream_id: u32,
stream_name: String,
- topic_id: Option<u32>,
topic_name: String,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
@@ -63,7 +61,6 @@ impl TestTopicCreateCmd {
Self {
stream_id,
stream_name,
- topic_id,
topic_name,
partitions_count,
compression_algorithm,
@@ -188,7 +185,6 @@ pub async fn should_be_successful() {
.execute_test(TestTopicCreateCmd::new(
0,
String::from("main"),
- None,
String::from("sync"),
1,
Default::default(),
@@ -202,7 +198,6 @@ pub async fn should_be_successful() {
.execute_test(TestTopicCreateCmd::new(
1,
String::from("testing"),
- None,
String::from("topic"),
5,
Default::default(),
@@ -216,7 +211,6 @@ pub async fn should_be_successful() {
.execute_test(TestTopicCreateCmd::new(
2,
String::from("prod"),
- None,
String::from("named"),
1,
Default::default(),
@@ -230,7 +224,6 @@ pub async fn should_be_successful() {
.execute_test(TestTopicCreateCmd::new(
3,
String::from("big"),
- None,
String::from("probe"),
2,
Default::default(),
diff --git a/core/integration/tests/cli/topic/test_topic_list_command.rs
b/core/integration/tests/cli/topic/test_topic_list_command.rs
index 0f394677..4f30123e 100644
--- a/core/integration/tests/cli/topic/test_topic_list_command.rs
+++ b/core/integration/tests/cli/topic/test_topic_list_command.rs
@@ -31,7 +31,6 @@ use serial_test::parallel;
struct TestTopicListCmd {
stream_id: u32,
stream_name: String,
- topic_id: u32,
topic_name: String,
using_stream_id: TestStreamId,
output: OutputFormat,
@@ -41,7 +40,6 @@ impl TestTopicListCmd {
fn new(
stream_id: u32,
stream_name: String,
- topic_id: u32,
topic_name: String,
using_stream_id: TestStreamId,
output: OutputFormat,
@@ -49,7 +47,6 @@ impl TestTopicListCmd {
Self {
stream_id,
stream_name,
- topic_id,
topic_name,
using_stream_id,
output,
@@ -137,7 +134,6 @@ pub async fn should_be_successful() {
.execute_test(TestTopicListCmd::new(
0,
String::from("main"),
- 0,
String::from("sync"),
TestStreamId::Named,
OutputFormat::Default,
@@ -147,7 +143,6 @@ pub async fn should_be_successful() {
.execute_test(TestTopicListCmd::new(
1,
String::from("customer"),
- 0,
String::from("topic"),
TestStreamId::Named,
OutputFormat::List,
@@ -157,7 +152,6 @@ pub async fn should_be_successful() {
.execute_test(TestTopicListCmd::new(
2,
String::from("production"),
- 0,
String::from("data"),
TestStreamId::Named,
OutputFormat::Table,
diff --git a/core/integration/tests/streaming/mod.rs
b/core/integration/tests/streaming/mod.rs
index d6f55a84..6783bbbf 100644
--- a/core/integration/tests/streaming/mod.rs
+++ b/core/integration/tests/streaming/mod.rs
@@ -129,4 +129,4 @@ async fn bootstrap_test_environment(
partition_id: 0,
task_registry,
})
-}
\ No newline at end of file
+}
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 502602d3..2980726e 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -16,8 +16,6 @@
* under the License.
*/
-use std::future::Future;
-
use crate::streaming::utils::PooledBuffer;
use crate::tcp::tcp_sender::TcpSender;
use crate::tcp::tcp_tls_sender::TcpTlsSender;
@@ -27,6 +25,7 @@ use compio::net::TcpStream;
use compio_quic::{RecvStream, SendStream};
use compio_tls::TlsStream;
use iggy_common::IggyError;
+use std::future::Future;
macro_rules! forward_async_methods {
(
@@ -68,6 +67,7 @@ pub trait Sender {
fn shutdown(&mut self) -> impl Future<Output = Result<(), ServerError>>;
}
+#[allow(clippy::large_enum_variant)]
pub enum SenderKind {
Tcp(TcpSender),
TcpTls(TcpTlsSender),
diff --git a/core/server/src/http/http_shard_wrapper.rs
b/core/server/src/http/http_shard_wrapper.rs
index bafe4f60..62ba1a24 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -250,6 +250,7 @@ impl HttpSafeShard {
self.shard().get_stats().await
}
+ #[allow(clippy::too_many_arguments)]
pub async fn poll_messages(
&self,
client_id: u32,
diff --git a/core/server/src/http/partitions.rs
b/core/server/src/http/partitions.rs
index 70b21ed5..123af4fb 100644
--- a/core/server/src/http/partitions.rs
+++ b/core/server/src/http/partitions.rs
@@ -23,7 +23,6 @@ use crate::http::shared::AppState;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
-use crate::streaming::{streams, topics};
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::routing::post;
@@ -76,41 +75,6 @@ async fn create_partitions(
partitions,
};
let _responses = shard.broadcast_event_to_all_shards(event).await;
-
- let numeric_stream_id = shard
- .streams2
- .with_stream_by_id(&command.stream_id,
streams::helpers::get_stream_id());
- let numeric_topic_id = shard.streams2.with_topic_by_id(
- &command.stream_id,
- &command.topic_id,
- topics::helpers::get_topic_id(),
- );
-
- // TODO: Replace with new mechanism
- /*
- let records = shard
- .create_shard_table_records(&partition_ids, numeric_stream_id,
numeric_topic_id)
- .collect::<Vec<_>>();
-
- for (ns, shard_info) in records.iter() {
- let partition = topic.get_partition(ns.partition_id)?;
- let mut partition = partition.write().await;
- partition.persist().await?;
- if shard_info.id() == shard.id {
- partition.open().await?;
- }
- }
-
- shard.insert_shard_table_records(records);
-
- let event = ShardEvent::CreatedShardTableRecords {
- stream_id: numeric_stream_id,
- topic_id: numeric_topic_id,
- partition_ids: partition_ids.clone(),
- };
- let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
- */
-
Ok::<(), CustomError>(())
});
@@ -160,8 +124,6 @@ async fn delete_partitions(
)
})?;
- // Broadcast event.
-
let command = EntryCommand::DeletePartitions(DeletePartitions {
stream_id: query.stream_id.clone(),
topic_id: query.topic_id.clone(),
@@ -178,4 +140,4 @@ async fn delete_partitions(
})?;
Ok(StatusCode::NO_CONTENT)
-}
+}
\ No newline at end of file
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 896c202b..4bebc36c 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -982,7 +982,7 @@ impl IggyShard {
}
Ok(())
- },
+ }
ShardEvent::JoinedConsumerGroup {
client_id,
stream_id,
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 437c163e..efef5a1c 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -167,6 +167,7 @@ impl IggyShard {
Ok(())
}
+ #[allow(clippy::too_many_arguments)]
pub async fn poll_messages(
&self,
client_id: u32,
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 0f742b2c..ca633cbe 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -25,8 +25,6 @@ use crate::shard_info;
use crate::slab::traits_ext::EntityMarker;
use crate::slab::traits_ext::IntoComponents;
use crate::streaming::partitions;
-use crate::streaming::partitions::journal::MemoryMessageJournal;
-use crate::streaming::partitions::log::SegmentedLog;
use crate::streaming::partitions::partition2;
use crate::streaming::partitions::storage2::create_partition_file_hierarchy;
use crate::streaming::partitions::storage2::delete_partitions_from_disk;
@@ -35,7 +33,6 @@ use
crate::streaming::segments::storage::create_segment_storage;
use crate::streaming::session::Session;
use crate::streaming::streams;
use crate::streaming::topics;
-
use error_set::ErrContext;
use iggy_common::Identifier;
use iggy_common::IggyError;
@@ -105,7 +102,6 @@ impl IggyShard {
let shards_count = self.get_available_shards_count();
for (partition_id, stats) in partitions.iter().map(|p| (p.id(),
p.stats())) {
- // TODO: Create shard table recordsj.
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
partition_id);
let shard_id = calculate_shard_assignment(&ns, shards_count);
let shard_info = ShardInfo::new(shard_id);
@@ -266,7 +262,7 @@ impl IggyShard {
let mut total_size_bytes = 0;
for partition in partitions {
- let (root, stats, _, _, _, _, mut log) =
partition.into_components();
+ let (root, stats, _, _, _, _, _) = partition.into_components();
let partition_id = root.id();
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
partition_id);
self.remove_shard_table_record(&ns);
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index b2679f07..a0d855c4 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -108,7 +108,7 @@ impl IggyShard {
}
pub fn delete_stream2_bypass_auth(&self, id: &Identifier) ->
stream2::Stream {
- self.delete_stream2_base(id)
+ self.delete_stream2_base(id)
}
fn delete_stream2_base(&self, id: &Identifier) -> stream2::Stream {
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index fbf583cd..ed8c348d 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -21,16 +21,12 @@ use crate::shard::IggyShard;
use crate::shard_info;
use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker, InsertCell,
IntoComponents};
use crate::streaming::session::Session;
-use crate::streaming::stats::{StreamStats, TopicStats};
use crate::streaming::topics::storage2::{create_topic_file_hierarchy,
delete_topic_from_disk};
use crate::streaming::topics::topic2::{self};
use crate::streaming::{partitions, streams, topics};
use error_set::ErrContext;
-use iggy_common::{
- CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp,
MaxTopicSize,
-};
+use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry,
MaxTopicSize};
use std::str::FromStr;
-use std::sync::Arc;
impl IggyShard {
#[allow(clippy::too_many_arguments)]
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index c13d0cbf..aef6754b 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -136,7 +136,7 @@ impl IggyShard {
) -> Result<User, IggyError> {
self.ensure_authenticated(session)?;
self.permissioner
- .borrow()
+ .borrow()
.create_user(session.get_user_id())
.with_error_context(|error| {
format!(
@@ -521,7 +521,8 @@ impl IggyShard {
}
fn logout_user_base(&self, user_id: u32, client_id: u32) -> Result<(),
IggyError> {
- let user = self
+ // TODO(hubcio): not sure if user is needed here
+ let _user = self
.get_user(&Identifier::numeric(user_id)?)
.with_error_context(|error| {
format!(
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index a57417e5..79c30edf 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -13,6 +13,7 @@ use iggy_common::{
};
use std::net::SocketAddr;
+#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
pub enum ShardEvent {
FlushUnsavedBuffer {
diff --git a/core/server/src/shard/transmission/message.rs
b/core/server/src/shard/transmission/message.rs
index 14bf075b..bf6537eb 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -25,12 +25,14 @@ use crate::{
};
use iggy_common::Identifier;
+#[allow(clippy::large_enum_variant)]
pub enum ShardSendRequestResult {
// TODO: In the future we can add other variants, for example backpressure
from the destination shard,
Recoil(ShardMessage),
Response(ShardResponse),
}
+#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum ShardMessage {
Request(ShardRequest),
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 386c7e56..694d925b 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1306,10 +1306,7 @@ impl Streams {
stream_id,
topic_id,
partition_id,
- streaming_partitions::helpers::update_index_and_increment_stats(
- saved,
- config,
- ),
+
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
);
Ok(batch_count)
diff --git a/core/server/src/state/mod.rs b/core/server/src/state/mod.rs
index 6ea022a6..2bf2c6bf 100644
--- a/core/server/src/state/mod.rs
+++ b/core/server/src/state/mod.rs
@@ -32,6 +32,9 @@ pub mod system;
pub const COMPONENT: &str = "STATE";
+// TODO(hubcio): I don't like this approach, we should avoid mocking and
simplify it all.
+
+#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum StateKind {
File(file::FileState),
diff --git a/core/server/src/streaming/partitions/partition2.rs
b/core/server/src/streaming/partitions/partition2.rs
index 087d23b0..1b5480a5 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -95,7 +95,7 @@ pub struct Partition {
}
impl Partition {
- #[allow(clippy::too_many_arguments)]
+ #[allow(clippy::too_many_arguments)]
pub fn new(
created_at: IggyTimestamp,
should_increment_offset: bool,
diff --git a/core/server/src/streaming/partitions/storage2.rs
b/core/server/src/streaming/partitions/storage2.rs
index 63e5cb65..42ff8698 100644
--- a/core/server/src/streaming/partitions/storage2.rs
+++ b/core/server/src/streaming/partitions/storage2.rs
@@ -1,11 +1,7 @@
use super::COMPONENT;
use crate::{
- configs::system::SystemConfig,
- io::fs_utils::remove_dir_all,
- shard_error, shard_info, shard_trace,
- streaming::partitions::{
- consumer_offset::ConsumerOffset,
- },
+ configs::system::SystemConfig, io::fs_utils::remove_dir_all, shard_error,
shard_info,
+ shard_trace, streaming::partitions::consumer_offset::ConsumerOffset,
};
use compio::{
fs::{self, OpenOptions, create_dir_all},
@@ -111,7 +107,6 @@ pub async fn delete_partitions_from_disk(
partition_id: usize,
config: &SystemConfig,
) -> Result<(), IggyError> {
-
let partition_path = config.get_partition_path(stream_id, topic_id,
partition_id);
remove_dir_all(&partition_path).await.map_err(|_| {
IggyError::CannotDeletePartitionDirectory(
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 976ffad9..1ddc0454 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -252,4 +252,4 @@ fn mimic_members(members: &Slab<Member>) -> Slab<Member> {
Member::new(member.client_id).insert_into(&mut container);
}
container
-}
\ No newline at end of file
+}
diff --git a/core/server/src/streaming/topics/storage2.rs
b/core/server/src/streaming/topics/storage2.rs
index b166bc96..c412d1dd 100644
--- a/core/server/src/streaming/topics/storage2.rs
+++ b/core/server/src/streaming/topics/storage2.rs
@@ -68,14 +68,7 @@ pub async fn delete_topic_from_disk(
let partition = partitions.delete(id);
let (root, stats, _, _, _, _, _log) = partition.into_components();
let partition_id = root.id();
- delete_partitions_from_disk(
- shard_id,
- stream_id,
- topic_id,
- partition_id,
- config,
- )
- .await?;
+ delete_partitions_from_disk(shard_id, stream_id, topic_id,
partition_id, config).await?;
messages_count += stats.messages_count_inconsistent();
size_bytes += stats.size_bytes_inconsistent();
segments_count += stats.segments_count_inconsistent();