hubcio commented on code in PR #2476:
URL: https://github.com/apache/iggy/pull/2476#discussion_r2618913259


##########
core/server/src/shard/handlers.rs:
##########
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::os::fd::FromRawFd;
+

Review Comment:
   remove empty line between use items, correct in other places too



##########
core/server/src/binary/handlers/messages/send_messages_handler.rs:
##########
@@ -107,19 +110,105 @@ impl ServerCommandHandler for SendMessages {
         );
         batch.validate()?;
 
+        shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
+
+        let numeric_stream_id = shard
+            .streams
+            .with_stream_by_id(&self.stream_id, 
streams::helpers::get_stream_id());
+
+        let numeric_topic_id = shard.streams.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            topics::helpers::get_topic_id(),
+        );
+
+        // TODO(tungtose): dry this code && get partition_id below have a side 
effect
+        let partition_id = shard.streams.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            |(root, auxilary, ..)| match self.partitioning.kind {
+                PartitioningKind::Balanced => {
+                    let upperbound = root.partitions().len();
+                    let pid = auxilary.get_next_partition_id(upperbound);
+                    Ok(pid)
+                }
+                PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+                    self.partitioning.value[..self.partitioning.length as 
usize]
+                        .try_into()
+                        .map_err(|_| IggyError::InvalidNumberEncoding)?,
+                ) as usize),
+                PartitioningKind::MessagesKey => {
+                    let upperbound = root.partitions().len();
+                    Ok(
+                        
topics::helpers::calculate_partition_id_by_messages_key_hash(
+                            upperbound,
+                            &self.partitioning.value,
+                        ),
+                    )
+                }
+            },
+        )?;
+
+        let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
         let user_id = session.get_user_id();
+        let balanced_partitioning = matches!(self.partitioning.kind, 
PartitioningKind::Balanced);
+        let enabled_socket_migration = shard.config.tcp.socket_migration;
+
+        if enabled_socket_migration
+            && !(session.is_migrated() || balanced_partitioning)
+            && let Some(target_shard) = shard.find_shard(&namespace)
+            && target_shard.id != shard.id
+        {
+            info!(
+                "TCP wrong shared detected: from_shard {}, to_shard {}",
+                shard.id, target_shard.id
+            );
+
+            if let Some(fd) = sender.take_and_migrate_tcp() {
+                let payload = ShardRequestPayload::SocketTransfer {
+                    fd,
+                    from_shard: shard.id,
+                    client_id: session.client_id,
+                    user_id,
+                    address: session.ip_address,
+                    initial_data: batch,
+                };
+
+                let request = ShardRequest::new(
+                    self.stream_id.clone(),
+                    self.topic_id.clone(),
+                    partition_id,
+                    payload,
+                );
+
+                let socket_transfer_msg = ShardMessage::Request(request);
+
+                if let Err(err) = shard
+                    .send_request_to_shard_or_recoil(Some(&namespace), 
socket_transfer_msg)
+                    .await
+                {
+                    error!(
+                        "Failed to send socket transfer to shard {}, err: 
{:?}",
+                        target_shard.id, err
+                    );
+
+                    // TODO(tungtose): restore TcpStream to current shard or 
drop

Review Comment:
   i'd drop connection



##########
core/server/src/shard/handlers.rs:
##########
@@ -312,10 +319,80 @@ async fn handle_request(
             shard.broadcast_event_to_all_shards(event).await?;
             Ok(ShardResponse::DeleteStreamResponse(stream))
         }
+        ShardRequestPayload::SocketTransfer {
+            fd,
+            from_shard,
+            client_id,
+            user_id,
+            address,
+            initial_data,
+        } => {
+            info!(
+                "Received socket transfer msg, fd: {fd}, from_shard: 
{from_shard}, address: {address}"
+            );
+
+            // restore TcpStream from fd
+            let tcp_stream = unsafe { TcpStream::from_raw_fd(fd) };

Review Comment:
   before reconstructing TcpStream check if fd is valid, something like 
   `nix::sys::stat::fstat(fd).map_err(|e| IggyError::IoError(format!("Invalid 
fd: {}", e)))?;`



##########
core/server/src/streaming/session.rs:
##########
@@ -60,6 +62,14 @@ impl Session {
         self.active.set(false);
     }
 
+    pub fn is_migrated(&self) -> bool {

Review Comment:
   whats the intended behavior? once migrated, can this session never migrate 
again?is that intentional? pls add brief doc comment



##########
core/server/src/binary/handlers/messages/send_messages_handler.rs:
##########
@@ -107,19 +110,105 @@ impl ServerCommandHandler for SendMessages {
         );
         batch.validate()?;
 
+        shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
+
+        let numeric_stream_id = shard
+            .streams
+            .with_stream_by_id(&self.stream_id, 
streams::helpers::get_stream_id());
+
+        let numeric_topic_id = shard.streams.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            topics::helpers::get_topic_id(),
+        );
+
+        // TODO(tungtose): dry this code && get partition_id below have a side 
effect
+        let partition_id = shard.streams.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            |(root, auxilary, ..)| match self.partitioning.kind {
+                PartitioningKind::Balanced => {
+                    let upperbound = root.partitions().len();
+                    let pid = auxilary.get_next_partition_id(upperbound);
+                    Ok(pid)
+                }
+                PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+                    self.partitioning.value[..self.partitioning.length as 
usize]
+                        .try_into()
+                        .map_err(|_| IggyError::InvalidNumberEncoding)?,
+                ) as usize),
+                PartitioningKind::MessagesKey => {
+                    let upperbound = root.partitions().len();
+                    Ok(
+                        
topics::helpers::calculate_partition_id_by_messages_key_hash(
+                            upperbound,
+                            &self.partitioning.value,
+                        ),
+                    )
+                }
+            },
+        )?;
+
+        let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
         let user_id = session.get_user_id();
+        let balanced_partitioning = matches!(self.partitioning.kind, 
PartitioningKind::Balanced);
+        let enabled_socket_migration = shard.config.tcp.socket_migration;
+
+        if enabled_socket_migration
+            && !(session.is_migrated() || balanced_partitioning)
+            && let Some(target_shard) = shard.find_shard(&namespace)
+            && target_shard.id != shard.id
+        {
+            info!(
+                "TCP wrong shared detected: from_shard {}, to_shard {}",

Review Comment:
   typo, and this print is info. is this intended?



##########
core/configs/server.toml:
##########
@@ -141,6 +141,9 @@ enabled = true
 # For example, "127.0.0.1:8090" listens on localhost only on port 8090.
 address = "127.0.0.1:8090"
 
+# Enable TCP socket migration across shards.

Review Comment:
   add startup log to server when its enabled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to