This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc_compio
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc_compio by this
push:
new ce064f53 fix descriptors
ce064f53 is described below
commit ce064f53b5a4b67e9dbb568f4433489b1d391357
Author: numinex <[email protected]>
AuthorDate: Fri Jul 4 15:24:17 2025 +0200
fix descriptors
---
.../binary/handlers/topics/create_topic_handler.rs | 3 +-
core/server/src/shard/mod.rs | 9 +++++-
core/server/src/shard/system/topics.rs | 35 ++++++++++------------
core/server/src/shard/transmission/event.rs | 3 +-
4 files changed, 28 insertions(+), 22 deletions(-)
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 6785e4f0..51088b55 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -48,7 +48,7 @@ impl ServerCommandHandler for CreateTopic {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id;
- let created_topic_id = shard
+ let (shards_assignment, created_topic_id) = shard
.create_topic(
session,
&self.stream_id,
@@ -72,6 +72,7 @@ impl ServerCommandHandler for CreateTopic {
compression_algorithm: self.compression_algorithm,
max_topic_size: self.max_topic_size,
replication_factor: self.replication_factor,
+ shards_assignment,
};
// Broadcast the event to all shards.
let _responses = shard.broadcast_event_to_all_shards(event.into());
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 29a1d51b..2178d2b3 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -110,6 +110,7 @@ impl Shard {
}
}
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct ShardInfo {
id: u16,
}
@@ -118,6 +119,10 @@ impl ShardInfo {
pub fn new(id: u16) -> Self {
Self { id }
}
+
+ pub fn id(&self) -> u16 {
+ self.id
+ }
}
pub struct IggyShard {
@@ -519,6 +524,7 @@ impl IggyShard {
compression_algorithm,
max_topic_size,
replication_factor,
+ shards_assignment,
} => {
self.create_topic_bypass_auth(
stream_id,
@@ -529,6 +535,7 @@ impl IggyShard {
*compression_algorithm,
*max_topic_size,
*replication_factor,
+ shards_assignment.clone()
)
.await
}
@@ -606,7 +613,7 @@ impl IggyShard {
pub fn insert_shard_table_records(
&self,
- records: impl Iterator<Item = (IggyNamespace, ShardInfo)>,
+ records: impl IntoIterator<Item = (IggyNamespace, ShardInfo)>,
) {
self.shards_table.borrow_mut().extend(records);
}
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 0f8155fb..d71b0f1d 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -112,8 +112,9 @@ impl IggyShard {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
+ shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
) -> Result<(), IggyError> {
- let (topic_id, partition_ids) = self.create_topic_base(
+ let (topic_id, _) = self.create_topic_base(
stream_id,
topic_id,
name,
@@ -132,22 +133,16 @@ impl IggyShard {
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get topic
with ID: {topic_id} in stream with ID: {stream_id}")
})?;
- topic.persist().await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
- })?;
- // TODO: Figure out a way how to distribute the shards table among
different shards,
- // without the need to do code from below, everytime we handle a
`ShardEvent`.
- // I think we shouldn't be sharing it tho and still maintain a single
shard table per shard,
- // but find a way how to distribute it smarter (maybe move the
broadcast inside of the `insert_shard_table_records`) method.
- let records = partition_ids.into_iter().map(|partition_id| {
- let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
- let hash = namespace.generate_hash();
- let shard_id = hash % self.get_available_shards_count();
- let shard_info = ShardInfo::new(shard_id as u16);
- (namespace, shard_info)
- });
- self.insert_shard_table_records(records);
+ for (_, shard_info) in &shards_assignment {
+ if shard_info.id() == self.id {
+ topic.persist().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to persist
topic: {topic}")
+ })?;
+ }
+ }
+
+ self.insert_shard_table_records(shards_assignment);
self.metrics.increment_topics(1);
self.metrics.increment_partitions(partitions_count);
@@ -167,7 +162,7 @@ impl IggyShard {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
- ) -> Result<Identifier, IggyError> {
+ ) -> Result<(Vec<(IggyNamespace, ShardInfo)>, Identifier), IggyError> {
self.ensure_authenticated(session)?;
{
let stream = self.get_stream(stream_id).with_error_context(|error|
{
@@ -208,6 +203,7 @@ impl IggyShard {
format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
})?;
+ // TODO: Refactor
let records = partition_ids.into_iter().map(|partition_id| {
let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
let hash = namespace.generate_hash();
@@ -215,13 +211,14 @@ impl IggyShard {
let shard_info = ShardInfo::new(shard_id as u16);
(namespace, shard_info)
});
- self.insert_shard_table_records(records);
+ let records = records.collect::<Vec<_>>();
+ self.insert_shard_table_records(records.clone());
self.metrics.increment_topics(1);
self.metrics.increment_partitions(partitions_count);
self.metrics.increment_segments(partitions_count);
- Ok(Identifier::numeric(topic_id)?)
+ Ok((records, Identifier::numeric(topic_id)?))
}
fn create_topic_base(
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index 49794c3f..60a28dfc 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -2,7 +2,7 @@ use std::net::SocketAddr;
use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
-use crate::streaming::clients::client_manager::Transport;
+use crate::{shard::{namespace::IggyNamespace, ShardInfo},
streaming::clients::client_manager::Transport};
#[derive(Debug)]
pub enum ShardEvent {
@@ -24,6 +24,7 @@ pub enum ShardEvent {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
+ shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
},
//CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
//DeletedConsumerGroup(Identifier, Identifier, Identifier),