This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc_compio
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc_compio by this 
push:
     new ce064f53 fix descriptors
ce064f53 is described below

commit ce064f53b5a4b67e9dbb568f4433489b1d391357
Author: numinex <[email protected]>
AuthorDate: Fri Jul 4 15:24:17 2025 +0200

    fix descriptors
---
 .../binary/handlers/topics/create_topic_handler.rs |  3 +-
 core/server/src/shard/mod.rs                       |  9 +++++-
 core/server/src/shard/system/topics.rs             | 35 ++++++++++------------
 core/server/src/shard/transmission/event.rs        |  3 +-
 4 files changed, 28 insertions(+), 22 deletions(-)

diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 6785e4f0..51088b55 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -48,7 +48,7 @@ impl ServerCommandHandler for CreateTopic {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id;
-        let created_topic_id = shard
+        let (shards_assignment, created_topic_id) = shard
                 .create_topic(
                     session,
                     &self.stream_id,
@@ -72,6 +72,7 @@ impl ServerCommandHandler for CreateTopic {
             compression_algorithm: self.compression_algorithm,
             max_topic_size: self.max_topic_size,
             replication_factor: self.replication_factor,
+            shards_assignment,
         };
         // Broadcast the event to all shards.
         let _responses = shard.broadcast_event_to_all_shards(event.into());
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 29a1d51b..2178d2b3 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -110,6 +110,7 @@ impl Shard {
     }
 }
 
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
 pub struct ShardInfo {
     id: u16,
 }
@@ -118,6 +119,10 @@ impl ShardInfo {
     pub fn new(id: u16) -> Self {
         Self { id }
     }
+
+    pub fn id(&self) -> u16 {
+        self.id
+    }
 }
 
 pub struct IggyShard {
@@ -519,6 +524,7 @@ impl IggyShard {
                 compression_algorithm,
                 max_topic_size,
                 replication_factor,
+                shards_assignment,
             } => {
                 self.create_topic_bypass_auth(
                     stream_id,
@@ -529,6 +535,7 @@ impl IggyShard {
                     *compression_algorithm,
                     *max_topic_size,
                     *replication_factor,
+                    shards_assignment.clone()
                 )
                 .await
             }
@@ -606,7 +613,7 @@ impl IggyShard {
 
     pub fn insert_shard_table_records(
         &self,
-        records: impl Iterator<Item = (IggyNamespace, ShardInfo)>,
+        records: impl IntoIterator<Item = (IggyNamespace, ShardInfo)>,
     ) {
         self.shards_table.borrow_mut().extend(records);
     }
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 0f8155fb..d71b0f1d 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -112,8 +112,9 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
+        shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
     ) -> Result<(), IggyError> {
-        let (topic_id, partition_ids) = self.create_topic_base(
+        let (topic_id, _) = self.create_topic_base(
             stream_id,
             topic_id,
             name,
@@ -132,22 +133,16 @@ impl IggyShard {
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id} in stream with ID: {stream_id}")
             })?;
-        topic.persist().await.with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
-        })?;
 
-        // TODO: Figure out a way how to distribute the shards table among 
different shards,
-        // without the need to do code from below, everytime we handle a 
`ShardEvent`.
-        // I think we shouldn't be sharing it tho and still maintain a single 
shard table per shard,
-        // but find a way how to distribute it smarter (maybe move the 
broadcast inside of the `insert_shard_table_records`) method.
-        let records = partition_ids.into_iter().map(|partition_id| {
-            let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
-            let hash = namespace.generate_hash();
-            let shard_id = hash % self.get_available_shards_count();
-            let shard_info = ShardInfo::new(shard_id as u16);
-            (namespace, shard_info)
-        });
-        self.insert_shard_table_records(records);
+        for (_, shard_info) in &shards_assignment {
+            if shard_info.id() == self.id {
+                topic.persist().await.with_error_context(|error| {
+                    format!("{COMPONENT} (error: {error}) - failed to persist 
topic: {topic}")
+                })?;
+            }
+        }
+
+        self.insert_shard_table_records(shards_assignment);
 
         self.metrics.increment_topics(1);
         self.metrics.increment_partitions(partitions_count);
@@ -167,7 +162,7 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-    ) -> Result<Identifier, IggyError> {
+    ) -> Result<(Vec<(IggyNamespace, ShardInfo)>, Identifier), IggyError> {
         self.ensure_authenticated(session)?;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
@@ -208,6 +203,7 @@ impl IggyShard {
             format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
         })?;
 
+        // TODO: Refactor
         let records = partition_ids.into_iter().map(|partition_id| {
             let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
             let hash = namespace.generate_hash();
@@ -215,13 +211,14 @@ impl IggyShard {
             let shard_info = ShardInfo::new(shard_id as u16);
             (namespace, shard_info)
         });
-        self.insert_shard_table_records(records);
+        let records = records.collect::<Vec<_>>();
+        self.insert_shard_table_records(records.clone());
 
         self.metrics.increment_topics(1);
         self.metrics.increment_partitions(partitions_count);
         self.metrics.increment_segments(partitions_count);
 
-        Ok(Identifier::numeric(topic_id)?)
+        Ok((records, Identifier::numeric(topic_id)?))
     }
 
     fn create_topic_base(
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 49794c3f..60a28dfc 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -2,7 +2,7 @@ use std::net::SocketAddr;
 
 use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
 
-use crate::streaming::clients::client_manager::Transport;
+use crate::{shard::{namespace::IggyNamespace, ShardInfo}, 
streaming::clients::client_manager::Transport};
 
 #[derive(Debug)]
 pub enum ShardEvent {
@@ -24,6 +24,7 @@ pub enum ShardEvent {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
+        shards_assignment: Vec<(IggyNamespace, ShardInfo)>,
     },
     //CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
     //DeletedConsumerGroup(Identifier, Identifier, Identifier),

Reply via email to