This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 96c3b1fb9 feat(server): socket migration across shard (#2476)
96c3b1fb9 is described below

commit 96c3b1fb9a7be6bf0c92ff0cb87dc387ed6a3d08
Author: tungtose <[email protected]>
AuthorDate: Sun Dec 21 18:04:57 2025 +0700

    feat(server): socket migration across shard (#2476)
    
    This PR addressing #2386
    
    Enable TCP socket migration across shards. When a client sends messages
    to the wrong shard, migrate the socket to the correct shard to eliminate
    cross-shard forwarding overhead
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 core/common/Cargo.toml                             |   1 +
 core/common/src/sender/mod.rs                      |  27 ++++-
 core/common/src/sender/tcp_sender.rs               |  49 ++++++---
 core/configs/server.toml                           |   3 +
 core/server/Cargo.toml                             |   2 +-
 core/server/src/binary/command.rs                  |  11 +-
 .../cluster/get_cluster_metadata_handler.rs        |   8 +-
 .../create_consumer_group_handler.rs               |   8 +-
 .../delete_consumer_group_handler.rs               |   8 +-
 .../consumer_groups/get_consumer_group_handler.rs  |  12 ++-
 .../consumer_groups/get_consumer_groups_handler.rs |   8 +-
 .../consumer_groups/join_consumer_group_handler.rs |   8 +-
 .../leave_consumer_group_handler.rs                |   8 +-
 .../delete_consumer_offset_handler.rs              |   8 +-
 .../get_consumer_offset_handler.rs                 |  12 ++-
 .../store_consumer_offset_handler.rs               |   8 +-
 .../messages/flush_unsaved_buffer_handler.rs       |   8 +-
 .../handlers/messages/poll_messages_handler.rs     |   8 +-
 .../handlers/messages/send_messages_handler.rs     | 111 ++++++++++++++++++---
 .../partitions/create_partitions_handler.rs        |   8 +-
 .../partitions/delete_partitions_handler.rs        |   8 +-
 .../create_personal_access_token_handler.rs        |   8 +-
 .../delete_personal_access_token_handler.rs        |   8 +-
 .../get_personal_access_tokens_handler.rs          |   8 +-
 .../login_with_personal_access_token_handler.rs    |   8 +-
 .../handlers/segments/delete_segments_handler.rs   |   8 +-
 .../handlers/streams/create_stream_handler.rs      |   8 +-
 .../handlers/streams/delete_stream_handler.rs      |   8 +-
 .../binary/handlers/streams/get_stream_handler.rs  |  12 ++-
 .../binary/handlers/streams/get_streams_handler.rs |   8 +-
 .../handlers/streams/purge_stream_handler.rs       |   8 +-
 .../handlers/streams/update_stream_handler.rs      |   8 +-
 .../binary/handlers/system/get_client_handler.rs   |  12 ++-
 .../binary/handlers/system/get_clients_handler.rs  |   8 +-
 .../src/binary/handlers/system/get_me_handler.rs   |   8 +-
 .../src/binary/handlers/system/get_snapshot.rs     |   8 +-
 .../binary/handlers/system/get_stats_handler.rs    |   8 +-
 .../src/binary/handlers/system/ping_handler.rs     |   6 +-
 .../binary/handlers/topics/create_topic_handler.rs |   8 +-
 .../binary/handlers/topics/delete_topic_handler.rs |   8 +-
 .../binary/handlers/topics/get_topic_handler.rs    |  12 ++-
 .../binary/handlers/topics/get_topics_handler.rs   |   8 +-
 .../binary/handlers/topics/purge_topic_handler.rs  |   8 +-
 .../binary/handlers/topics/update_topic_handler.rs |   8 +-
 .../handlers/users/change_password_handler.rs      |   8 +-
 .../binary/handlers/users/create_user_handler.rs   |   8 +-
 .../binary/handlers/users/delete_user_handler.rs   |   8 +-
 .../src/binary/handlers/users/get_user_handler.rs  |  12 ++-
 .../src/binary/handlers/users/get_users_handler.rs |   8 +-
 .../binary/handlers/users/login_user_handler.rs    |   8 +-
 .../binary/handlers/users/logout_user_handler.rs   |   8 +-
 .../handlers/users/update_permissions_handler.rs   |   8 +-
 .../binary/handlers/users/update_user_handler.rs   |   8 +-
 core/server/src/configs/defaults.rs                |   1 +
 core/server/src/configs/displays.rs                |   4 +-
 core/server/src/configs/tcp.rs                     |   1 +
 core/server/src/http/http_server.rs                |  10 +-
 core/server/src/http/http_shard_wrapper.rs         |  35 ++++++-
 core/server/src/main.rs                            |   5 +
 core/server/src/shard/communication.rs             |   2 +-
 core/server/src/shard/handlers.rs                  |  93 ++++++++++++++++-
 core/server/src/shard/mod.rs                       |  18 +---
 core/server/src/shard/system/messages.rs           |  33 +-----
 .../src/shard/tasks/continuous/message_pump.rs     |   4 +-
 core/server/src/shard/transmission/frame.rs        |   1 +
 core/server/src/shard/transmission/message.rs      |  10 ++
 core/server/src/streaming/session.rs               |  13 +++
 core/server/src/tcp/connection_handler.rs          |  33 ++++--
 core/server/src/tcp/tcp_listener.rs                |  47 ++++++---
 core/server/src/websocket/websocket_listener.rs    |   4 +-
 .../server/src/websocket/websocket_tls_listener.rs |   4 +-
 73 files changed, 655 insertions(+), 259 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index adc4d5afe..0f671da6d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4657,6 +4657,7 @@ dependencies = [
  "figment",
  "human-repr",
  "humantime",
+ "nix",
  "once_cell",
  "rcgen",
  "rustls",
diff --git a/Cargo.toml b/Cargo.toml
index dd31ec539..2ccb4b1ea 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -145,6 +145,7 @@ lazy_static = "1.5.0"
 log = "0.4.29"
 mimalloc = "0.1"
 mockall = "0.14.0"
+nix = { version = "0.30.1", features = ["fs", "resource"] }
 nonzero_lit = "0.1.2"
 once_cell = "1.21.3"
 parquet = "=55.2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index fbfe9b3a5..e935435ef 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -54,6 +54,7 @@ fast-async-mutex = { version = "0.6.7", optional = true }
 figment = { workspace = true }
 human-repr = { workspace = true }
 humantime = { workspace = true }
+nix = { workspace = true }
 once_cell = { workspace = true }
 rcgen = "0.14.6"
 rustls = { workspace = true }
diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs
index 28d73f87a..b1da6e254 100644
--- a/core/common/src/sender/mod.rs
+++ b/core/common/src/sender/mod.rs
@@ -37,7 +37,8 @@ use compio::net::TcpStream;
 use compio_quic::{RecvStream, SendStream};
 use compio_tls::TlsStream;
 use std::future::Future;
-use tracing::debug;
+use std::os::fd::{AsFd, OwnedFd};
+use tracing::{debug, error};
 
 macro_rules! forward_async_methods {
     (
@@ -92,7 +93,9 @@ pub enum SenderKind {
 
 impl SenderKind {
     pub fn get_tcp_sender(stream: TcpStream) -> Self {
-        Self::Tcp(TcpSender { stream })
+        Self::Tcp(TcpSender {
+            stream: Some(stream),
+        })
     }
 
     pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self {
@@ -114,6 +117,26 @@ impl SenderKind {
         Self::WebSocketTls(stream)
     }
 
+    pub fn take_and_migrate_tcp(&mut self) -> Option<OwnedFd> {
+        match self {
+            SenderKind::Tcp(tcp_sender) => {
+                let stream = tcp_sender.stream.take()?;
+                let poll_fd = stream.into_poll_fd().ok()?;
+
+                let raw_fd = poll_fd.as_fd();
+                let Ok(owned_fd) = nix::unistd::dup(raw_fd) else {
+                    // TODO(tungtose): recover tcp stream?
+                    error!("Failed to dup fd");
+                    return None;
+                };
+
+                Some(owned_fd)
+            }
+            // TODO(tungtose): support TCP TLS
+            _ => None,
+        }
+    }
+
     forward_async_methods! {
         async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), 
IggyError>, B);
         async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
diff --git a/core/common/src/sender/tcp_sender.rs 
b/core/common/src/sender/tcp_sender.rs
index 974c4c013..3dc008bce 100644
--- a/core/common/src/sender/tcp_sender.rs
+++ b/core/common/src/sender/tcp_sender.rs
@@ -27,34 +27,50 @@ const COMPONENT: &str = "TCP";
 
 #[derive(Debug)]
 pub struct TcpSender {
-    pub(crate) stream: TcpStream,
+    pub(crate) stream: Option<TcpStream>,
 }
 
 impl Sender for TcpSender {
     async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), 
IggyError>, B) {
-        super::read(&mut self.stream, buffer).await
+        match self.stream.as_mut() {
+            Some(stream) => super::read(stream, buffer).await,
+            None => (Err(IggyError::ConnectionClosed), buffer),
+        }
     }
 
     async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
-        super::send_empty_ok_response(&mut self.stream).await
+        match self.stream.as_mut() {
+            Some(stream) => super::send_empty_ok_response(stream).await,
+            None => Err(IggyError::ConnectionClosed),
+        }
     }
 
     async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError> {
-        super::send_ok_response(&mut self.stream, payload).await
+        match self.stream.as_mut() {
+            Some(stream) => super::send_ok_response(stream, payload).await,
+            None => Err(IggyError::ConnectionClosed),
+        }
     }
 
     async fn send_error_response(&mut self, error: IggyError) -> Result<(), 
IggyError> {
-        super::send_error_response(&mut self.stream, error).await
+        match self.stream.as_mut() {
+            Some(stream) => super::send_error_response(stream, error).await,
+            None => Err(IggyError::ConnectionClosed),
+        }
     }
 
     async fn shutdown(&mut self) -> Result<(), IggyError> {
-        self.stream
-            .shutdown()
-            .await
-            .with_error(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to shutdown TCP 
stream")
-            })
-            .map_err(|e| IggyError::IoError(e.to_string()))
+        match self.stream.as_mut() {
+            Some(stream) => stream
+                .shutdown()
+                .await
+                .with_error(|error| {
+                    format!("{COMPONENT} (error: {error}) - failed to shutdown 
TCP stream")
+                })
+                .map_err(|e| IggyError::IoError(e.to_string())),
+
+            None => Err(IggyError::ConnectionClosed),
+        }
     }
 
     async fn send_ok_response_vectored(
@@ -62,6 +78,13 @@ impl Sender for TcpSender {
         length: &[u8],
         slices: Vec<PooledBuffer>,
     ) -> Result<(), IggyError> {
-        super::send_ok_response_vectored(&mut self.stream, length, 
slices).await
+        if self.stream.is_none() {
+            tracing::error!("Tried to send but stream is None!");
+        }
+        match self.stream.as_mut() {
+            Some(stream) => super::send_ok_response_vectored(stream, length, 
slices).await,
+
+            None => Err(IggyError::ConnectionClosed),
+        }
     }
 }
diff --git a/core/configs/server.toml b/core/configs/server.toml
index d795de0a9..cec71ddc7 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -151,6 +151,9 @@ enabled = true
 # For example, "127.0.0.1:8090" listens on localhost only on port 8090.
 address = "127.0.0.1:8090"
 
+# Enable TCP socket migration across shards.
+socket_migration = true
+
 # Whether to use ipv4 or ipv6
 ipv6 = false
 
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index d10f082f5..bbf15794a 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -78,7 +78,7 @@ lending-iterator = "0.1.7"
 mimalloc = { workspace = true, optional = true }
 mime_guess = { version = "2.0", optional = true }
 moka = { version = "0.12.11", features = ["future"] }
-nix = { version = "0.30", features = ["fs", "resource"] }
+nix = { workspace = true }
 opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
 opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] }
 opentelemetry-otlp = { version = "0.31.0", features = [
diff --git a/core/server/src/binary/command.rs 
b/core/server/src/binary/command.rs
index 1f9389f38..e10fe6ab8 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -121,6 +121,15 @@ define_server_command_enum! {
     LeaveConsumerGroup(LeaveConsumerGroup), LEAVE_CONSUMER_GROUP_CODE, 
LEAVE_CONSUMER_GROUP, true;
 }
 
+/// Indicates whether a command handler completed normally or migrated the 
connection.
+pub enum HandlerResult {
+    /// Command completed, connection stays on current shard.
+    Finished,
+
+    /// Connection was migrated to another shard. Source shard should exit 
without cleanup.
+    Migrated { to_shard: u16 },
+}
+
 #[enum_dispatch]
 pub trait ServerCommandHandler {
     /// Return the command code
@@ -134,7 +143,7 @@ pub trait ServerCommandHandler {
         length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError>;
+    ) -> Result<HandlerResult, IggyError>;
 }
 
 pub trait BinaryServerCommand {
diff --git 
a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs 
b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
index 0de5a4ba1..af6c7a9a0 100644
--- a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
+++ b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
@@ -40,14 +42,14 @@ impl ServerCommandHandler for GetClusterMetadata {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let cluster_metadata = shard.get_cluster_metadata(session)?;
 
         let response = cluster_metadata.to_bytes();
         sender.send_ok_response(&response).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 38cd1d5f5..0c6e9a763 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let cg = shard.create_consumer_group(
             session,
@@ -86,7 +88,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
             |(root, members)| mapper::map_consumer_group(root, members),
         );
         sender.send_ok_response(&response).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index e406cb038..1da98ac44 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -45,7 +47,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let cg = shard.delete_consumer_group(session, &self.stream_id, 
&self.topic_id, &self.group_id).with_error(|error| {
             format!(
@@ -123,7 +125,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
                 )
             })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
index cae09dced..7156a4d53 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetConsumerGroup {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard.ensure_authenticated(session)?;
         let exists = shard
@@ -48,7 +50,7 @@ impl ServerCommandHandler for GetConsumerGroup {
             .is_ok();
         if !exists {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         }
         let numeric_topic_id = shard.streams.with_topic_by_id(
             &self.stream_id,
@@ -65,7 +67,7 @@ impl ServerCommandHandler for GetConsumerGroup {
             .is_ok();
         if !has_permission {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         }
 
         let consumer_group = shard.streams.with_consumer_group_by_id(
@@ -75,7 +77,7 @@ impl ServerCommandHandler for GetConsumerGroup {
             |(root, members)| mapper::map_consumer_group(root, members),
         );
         sender.send_ok_response(&consumer_group).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
index 844f959db..d59f33c97 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for GetConsumerGroups {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard.ensure_authenticated(session)?;
         shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
@@ -69,7 +71,7 @@ impl ServerCommandHandler for GetConsumerGroups {
                     })
                 });
         sender.send_ok_response(&consumer_groups).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index 4eb2b9196..ad68aa333 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::consumer_groups::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -41,7 +43,7 @@ impl ServerCommandHandler for JoinConsumerGroup {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard
             .join_consumer_group(
@@ -58,7 +60,7 @@ impl ServerCommandHandler for JoinConsumerGroup {
             })?;
 
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index 9601d1b7c..a39ede547 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -17,7 +17,9 @@
  */
 
 use super::COMPONENT;
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use iggy_common::SenderKind;
 
@@ -42,7 +44,7 @@ impl ServerCommandHandler for LeaveConsumerGroup {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         shard
@@ -60,7 +62,7 @@ impl ServerCommandHandler for LeaveConsumerGroup {
             })?;
 
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index caf154f00..ee7aa269b 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard
             .delete_consumer_offset(
@@ -54,7 +56,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
                 self.topic_id, self.stream_id, self.partition_id, session
             ))?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
index cc79b2f52..29ee6f310 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for GetConsumerOffset {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let Ok(offset) = shard
             .get_consumer_offset(
@@ -52,17 +54,17 @@ impl ServerCommandHandler for GetConsumerOffset {
             .await
         else {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         };
 
         let Some(offset) = offset else {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         };
 
         let offset = mapper::map_consumer_offset(&offset);
         sender.send_ok_response(&offset).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index b422c3370..cbc99df99 100644
--- 
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::consumer_offsets::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard
             .store_consumer_offset(
@@ -57,7 +59,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
                 self.stream_id, self.topic_id, self.partition_id, self.offset, 
session
             ))?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs 
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 85099ecfa..30c70cd47 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::messages::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let user_id = session.get_user_id();
@@ -64,7 +66,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
                 )
             })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 85abb0450..0839e3a55 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
 use crate::shard::system::messages::PollingArgs;
@@ -53,7 +55,7 @@ impl ServerCommandHandler for PollMessages {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let PollMessages {
             consumer,
@@ -111,7 +113,7 @@ impl ServerCommandHandler for PollMessages {
         sender
             .send_ok_response_vectored(&response_length_bytes, bufs)
             .await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 017056042..1d2e5d9d0 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -16,20 +16,23 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
+use crate::binary::command::{BinaryServerCommand, HandlerResult, 
ServerCommandHandler};
 use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
+use crate::streaming::{streams, topics};
 use anyhow::Result;
 use compio::buf::{IntoInner as _, IoBuf};
-use iggy_common::INDEX_SIZE;
 use iggy_common::Identifier;
 use iggy_common::PooledBuffer;
 use iggy_common::SenderKind;
 use iggy_common::Sizeable;
+use iggy_common::{INDEX_SIZE, PartitioningKind};
 use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
 use std::rc::Rc;
-use tracing::instrument;
+use tracing::{debug, error, info, instrument};
 
 impl ServerCommandHandler for SendMessages {
     fn code(&self) -> u32 {
@@ -49,7 +52,7 @@ impl ServerCommandHandler for SendMessages {
         length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         let total_payload_size = length as usize - std::mem::size_of::<u32>();
         let metadata_len_field_size = std::mem::size_of::<u32>();
 
@@ -107,19 +110,103 @@ impl ServerCommandHandler for SendMessages {
         );
         batch.validate()?;
 
+        shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
+
+        let numeric_stream_id = shard
+            .streams
+            .with_stream_by_id(&self.stream_id, 
streams::helpers::get_stream_id());
+
+        let numeric_topic_id = shard.streams.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            topics::helpers::get_topic_id(),
+        );
+
+        // TODO(tungtose): dry this code && get partition_id below have a side 
effect
+        let partition_id = shard.streams.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            |(root, auxilary, ..)| match self.partitioning.kind {
+                PartitioningKind::Balanced => {
+                    let upperbound = root.partitions().len();
+                    let pid = auxilary.get_next_partition_id(upperbound);
+                    Ok(pid)
+                }
+                PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+                    self.partitioning.value[..self.partitioning.length as 
usize]
+                        .try_into()
+                        .map_err(|_| IggyError::InvalidNumberEncoding)?,
+                ) as usize),
+                PartitioningKind::MessagesKey => {
+                    let upperbound = root.partitions().len();
+                    Ok(
+                        
topics::helpers::calculate_partition_id_by_messages_key_hash(
+                            upperbound,
+                            &self.partitioning.value,
+                        ),
+                    )
+                }
+            },
+        )?;
+
+        let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
         let user_id = session.get_user_id();
+        let unsupport_socket_transfer = matches!(
+            self.partitioning.kind,
+            PartitioningKind::Balanced | PartitioningKind::MessagesKey
+        );
+        let enabled_socket_migration = shard.config.tcp.socket_migration;
+
+        if enabled_socket_migration
+            && !(session.is_migrated() || unsupport_socket_transfer)
+            && let Some(target_shard) = shard.find_shard(&namespace)
+            && target_shard.id != shard.id
+        {
+            debug!(
+                "TCP wrong shared detected: migrating from_shard {}, to_shard 
{}",
+                shard.id, target_shard.id
+            );
+
+            if let Some(fd) = sender.take_and_migrate_tcp() {
+                let payload = ShardRequestPayload::SocketTransfer {
+                    fd,
+                    from_shard: shard.id,
+                    client_id: session.client_id,
+                    user_id,
+                    address: session.ip_address,
+                    initial_data: batch,
+                };
+
+                let request = ShardRequest::new(
+                    self.stream_id.clone(),
+                    self.topic_id.clone(),
+                    partition_id,
+                    payload,
+                );
+
+                let socket_transfer_msg = ShardMessage::Request(request);
+
+                if let Err(e) = shard
+                    .send_request_to_shard_or_recoil(Some(&namespace), 
socket_transfer_msg)
+                    .await
+                {
+                    error!("tranfer socket to another shard failed, drop 
connection. {e:?}");
+                    return Ok(HandlerResult::Finished);
+                }
+
+                info!("Sending socket transfer to shard {}", target_shard.id);
+                return Ok(HandlerResult::Migrated {
+                    to_shard: target_shard.id,
+                });
+            }
+        }
+
         shard
-            .append_messages(
-                user_id,
-                self.stream_id,
-                self.topic_id,
-                &self.partitioning,
-                batch,
-            )
+            .append_messages(user_id, self.stream_id, self.topic_id, 
partition_id, batch)
             .await?;
 
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index a6792939b..0ead0c4ad 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -45,7 +47,7 @@ impl ServerCommandHandler for CreatePartitions {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         // Acquire partition lock to serialize filesystem operations
@@ -94,7 +96,7 @@ impl ServerCommandHandler for CreatePartitions {
             )
         })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 63bca4e21..fe6740136 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -43,7 +45,7 @@ impl ServerCommandHandler for DeletePartitions {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
@@ -91,7 +93,7 @@ impl ServerCommandHandler for DeletePartitions {
             )
         })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index 5a1f3b652..c52ddd4eb 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let (personal_access_token, token) = shard
@@ -83,7 +85,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
                 )
             })?;
         sender.send_ok_response(&bytes).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index 0763f574d..9a588ccbe 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let token_name = self.name.clone();
 
@@ -71,7 +73,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
                 "{COMPONENT} (error: {error}) - failed to apply delete 
personal access token with name: {token_name}, session: {session}"
             )})?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
index ab453a6ce..3c5df6417 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let personal_access_tokens = shard
             .get_personal_access_tokens(session)
@@ -49,7 +51,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens {
             })?;
         let personal_access_tokens = 
mapper::map_personal_access_tokens(personal_access_tokens);
         sender.send_ok_response(&personal_access_tokens).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index 18cd634f6..921a813e4 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -18,7 +18,9 @@
 use crate::shard::IggyShard;
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::personal_access_tokens::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let user = shard
             .login_with_personal_access_token(&self.token, Some(session))
@@ -57,7 +59,7 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
             })?;
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index b75c71a33..2d161d411 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::partitions::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
@@ -47,7 +49,7 @@ impl ServerCommandHandler for DeleteSegments {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let stream_id = self.stream_id.clone();
@@ -123,7 +125,7 @@ impl ServerCommandHandler for DeleteSegments {
             })?;
 
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index cde50e3ce..e56f87e64 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -50,7 +52,7 @@ impl ServerCommandHandler for CreateStream {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let request = ShardRequest {
@@ -136,7 +138,7 @@ impl ServerCommandHandler for CreateStream {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index 67d7add90..76bc9f8ca 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
@@ -48,7 +50,7 @@ impl ServerCommandHandler for DeleteStream {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let request = ShardRequest {
@@ -119,7 +121,7 @@ impl ServerCommandHandler for DeleteStream {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs 
b/core/server/src/binary/handlers/streams/get_stream_handler.rs
index da10b3f41..8d6f3f8ce 100644
--- a/core/server/src/binary/handlers/streams/get_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -42,13 +44,13 @@ impl ServerCommandHandler for GetStream {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard.ensure_authenticated(session)?;
         let exists = shard.ensure_stream_exists(&self.stream_id).is_ok();
         if !exists {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         }
         let stream_id = shard
             .streams
@@ -67,13 +69,13 @@ impl ServerCommandHandler for GetStream {
             .is_ok();
         if !has_permission {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         }
         let response = shard
             .streams
             .with_components_by_id(stream_id, |(root, stats)| 
mapper::map_stream(&root, &stats));
         sender.send_ok_response(&response).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/streams/get_streams_handler.rs 
b/core/server/src/binary/handlers/streams/get_streams_handler.rs
index a3305663b..06e7f28c9 100644
--- a/core/server/src/binary/handlers/streams/get_streams_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_streams_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -42,7 +44,7 @@ impl ServerCommandHandler for GetStreams {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard.ensure_authenticated(session)?;
         shard
@@ -61,7 +63,7 @@ impl ServerCommandHandler for GetStreams {
             mapper::map_streams(&roots, &stats)
         });
         sender.send_ok_response(&response).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs 
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index 861140bbf..b8d3cfc0c 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -43,7 +45,7 @@ impl ServerCommandHandler for PurgeStream {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
 
@@ -64,7 +66,7 @@ impl ServerCommandHandler for PurgeStream {
                 format!("{COMPONENT} (error: {error}) - failed to apply purge 
stream with id: {stream_id}, session: {session}")
             })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index eeaca94e3..c080412a7 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::streams::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -43,7 +45,7 @@ impl ServerCommandHandler for UpdateStream {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         shard
@@ -65,7 +67,7 @@ impl ServerCommandHandler for UpdateStream {
                 format!("{COMPONENT} (error: {error}) - failed to apply update 
stream with id: {stream_id}, session: {session}")
             })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs 
b/core/server/src/binary/handlers/system/get_client_handler.rs
index bdcc8ccc2..b9e16aef3 100644
--- a/core/server/src/binary/handlers/system/get_client_handler.rs
+++ b/core/server/src/binary/handlers/system/get_client_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -38,23 +40,23 @@ impl ServerCommandHandler for GetClient {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let Ok(client) = shard.get_client(session, self.client_id) else {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         };
 
         let Some(client) = client else {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         };
 
         let bytes = mapper::map_client(&client);
 
         sender.send_ok_response(&bytes).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs 
b/core/server/src/binary/handlers/system/get_clients_handler.rs
index 1a1865641..232a03f6b 100644
--- a/core/server/src/binary/handlers/system/get_clients_handler.rs
+++ b/core/server/src/binary/handlers/system/get_clients_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetClients {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let clients = shard.get_clients(session).with_error(|error| {
@@ -48,7 +50,7 @@ impl ServerCommandHandler for GetClients {
         })?;
         let clients = mapper::map_clients(clients).await;
         sender.send_ok_response(&clients).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs 
b/core/server/src/binary/handlers/system/get_me_handler.rs
index ba4bb3b79..11b35c5a2 100644
--- a/core/server/src/binary/handlers/system/get_me_handler.rs
+++ b/core/server/src/binary/handlers/system/get_me_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for GetMe {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         let Some(client) = shard
             .get_client(session, session.client_id)
             .with_error(|error| {
@@ -52,7 +54,7 @@ impl ServerCommandHandler for GetMe {
         let bytes = mapper::map_client(&client);
 
         sender.send_ok_response(&bytes).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs 
b/core/server/src/binary/handlers/system/get_snapshot.rs
index f352eacb7..cc2163b91 100644
--- a/core/server/src/binary/handlers/system/get_snapshot.rs
+++ b/core/server/src/binary/handlers/system/get_snapshot.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
@@ -38,7 +40,7 @@ impl ServerCommandHandler for GetSnapshot {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let snapshot = shard
@@ -46,7 +48,7 @@ impl ServerCommandHandler for GetSnapshot {
             .await?;
         let bytes = Bytes::copy_from_slice(&snapshot.0);
         sender.send_ok_response(&bytes).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs 
b/core/server/src/binary/handlers/system/get_stats_handler.rs
index a318a4c98..a655a377d 100644
--- a/core/server/src/binary/handlers/system/get_stats_handler.rs
+++ b/core/server/src/binary/handlers/system/get_stats_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::system::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -44,7 +46,7 @@ impl ServerCommandHandler for GetStats {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         // Route GetStats to shard0 only
@@ -82,7 +84,7 @@ impl ServerCommandHandler for GetStats {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/system/ping_handler.rs 
b/core/server/src/binary/handlers/system/ping_handler.rs
index e1c1fbe7e..e22fa7ba8 100644
--- a/core/server/src/binary/handlers/system/ping_handler.rs
+++ b/core/server/src/binary/handlers/system/ping_handler.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
+use crate::binary::command::{BinaryServerCommand, HandlerResult, 
ServerCommandHandler};
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -38,7 +38,7 @@ impl ServerCommandHandler for Ping {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         if let Some(mut client) = 
shard.client_manager.try_get_client_mut(session.client_id) {
             let now = IggyTimestamp::now();
@@ -47,7 +47,7 @@ impl ServerCommandHandler for Ping {
         }
 
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 8168d873b..b5b5c96a2 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -51,7 +53,7 @@ impl ServerCommandHandler for CreateTopic {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let request = ShardRequest {
@@ -192,7 +194,7 @@ impl ServerCommandHandler for CreateTopic {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index c25aa005b..ea3d21cdd 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -50,7 +52,7 @@ impl ServerCommandHandler for DeleteTopic {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let request = ShardRequest {
@@ -132,7 +134,7 @@ impl ServerCommandHandler for DeleteTopic {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs 
b/core/server/src/binary/handlers/topics/get_topic_handler.rs
index 14fcabc67..fcc42ac0b 100644
--- a/core/server/src/binary/handlers/topics/get_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetTopic {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard.ensure_authenticated(session)?;
         let exists = shard
@@ -48,7 +50,7 @@ impl ServerCommandHandler for GetTopic {
             .is_ok();
         if !exists {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         }
 
         let numeric_stream_id = shard
@@ -69,7 +71,7 @@ impl ServerCommandHandler for GetTopic {
             .is_ok();
         if !has_permission {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         }
 
         let response =
@@ -79,7 +81,7 @@ impl ServerCommandHandler for GetTopic {
                     mapper::map_topic(&root, &stats)
                 });
         sender.send_ok_response(&response).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs 
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 37123afce..96c3f9325 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for GetTopics {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         shard.ensure_authenticated(session)?;
         shard.ensure_stream_exists(&self.stream_id)?;
@@ -60,7 +62,7 @@ impl ServerCommandHandler for GetTopics {
             })
         });
         sender.send_ok_response(&response).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs 
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index abc1651b4..2cea843ed 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for PurgeTopic {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let topic_id = self.topic_id.clone();
         let stream_id = self.stream_id.clone();
@@ -72,7 +74,7 @@ impl ServerCommandHandler for PurgeTopic {
             )
             })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs 
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index a553cefff..6dfa2281d 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::topics::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -48,7 +50,7 @@ impl ServerCommandHandler for UpdateTopic {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let request = ShardRequest {
@@ -165,7 +167,7 @@ impl ServerCommandHandler for UpdateTopic {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index fb03a46eb..dc512e172 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -45,7 +47,7 @@ impl ServerCommandHandler for ChangePassword {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         info!("Changing password for user with ID: {}...", self.user_id);
@@ -91,7 +93,7 @@ impl ServerCommandHandler for ChangePassword {
                 )
             })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index f4cdcec73..0caa873f1 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -50,7 +52,7 @@ impl ServerCommandHandler for CreateUser {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let request = ShardRequest {
@@ -167,7 +169,7 @@ impl ServerCommandHandler for CreateUser {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index abf74cb4a..55b0f103d 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -49,7 +51,7 @@ impl ServerCommandHandler for DeleteUser {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let request = ShardRequest {
@@ -122,7 +124,7 @@ impl ServerCommandHandler for DeleteUser {
             },
         }
 
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs 
b/core/server/src/binary/handlers/users/get_user_handler.rs
index 3a10279a8..97322a340 100644
--- a/core/server/src/binary/handlers/users/get_user_handler.rs
+++ b/core/server/src/binary/handlers/users/get_user_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::shard::IggyShard;
@@ -39,20 +41,20 @@ impl ServerCommandHandler for GetUser {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let Ok(user) = shard.find_user(session, &self.user_id) else {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         };
         let Some(user) = user else {
             sender.send_empty_ok_response().await?;
-            return Ok(());
+            return Ok(HandlerResult::Finished);
         };
 
         let bytes = mapper::map_user(&user);
         sender.send_ok_response(&bytes).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs 
b/core/server/src/binary/handlers/users/get_users_handler.rs
index bd1a5b2d4..b67f85875 100644
--- a/core/server/src/binary/handlers/users/get_users_handler.rs
+++ b/core/server/src/binary/handlers/users/get_users_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -41,14 +43,14 @@ impl ServerCommandHandler for GetUsers {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         let users = shard.get_users(session).await.with_error(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get users, 
session: {session}")
         })?;
         let users = mapper::map_users(users);
         sender.send_ok_response(&users).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 53bd4bceb..9c32126a5 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -16,7 +16,9 @@
  * under the License.
  */
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for LoginUser {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         if shard.is_shutting_down() {
             warn!("Rejecting login request during shutdown");
             return Err(IggyError::Disconnected);
@@ -64,7 +66,7 @@ impl ServerCommandHandler for LoginUser {
 
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs 
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 850312f1e..92c1cccb7 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -43,7 +45,7 @@ impl ServerCommandHandler for LogoutUser {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
         info!("Logging out user with ID: {}...", session.get_user_id());
         shard.logout_user(session).with_error(|error| {
@@ -52,7 +54,7 @@ impl ServerCommandHandler for LogoutUser {
         info!("Logged out user with ID: {}.", session.get_user_id());
         session.clear_user_id();
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git 
a/core/server/src/binary/handlers/users/update_permissions_handler.rs 
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index bc07707ef..948ce8e41 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -45,7 +47,7 @@ impl ServerCommandHandler for UpdatePermissions {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         shard
@@ -68,7 +70,7 @@ impl ServerCommandHandler for UpdatePermissions {
             )
             .await?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index c89878de2..f4e12a0f5 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -18,7 +18,9 @@
 
 use std::rc::Rc;
 
-use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
+use crate::binary::command::{
+    BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
 use crate::binary::handlers::users::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 
@@ -45,7 +47,7 @@ impl ServerCommandHandler for UpdateUser {
         _length: u32,
         session: &Session,
         shard: &Rc<IggyShard>,
-    ) -> Result<(), IggyError> {
+    ) -> Result<HandlerResult, IggyError> {
         debug!("session: {session}, command: {self}");
 
         let user =shard
@@ -82,7 +84,7 @@ impl ServerCommandHandler for UpdateUser {
                 )
             })?;
         sender.send_empty_ok_response().await?;
-        Ok(())
+        Ok(HandlerResult::Finished)
     }
 }
 
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index b2d810e1e..f6d9d83e5 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -129,6 +129,7 @@ impl Default for TcpConfig {
             ipv6: SERVER_CONFIG.tcp.ipv_6,
             tls: TcpTlsConfig::default(),
             socket: TcpSocketConfig::default(),
+            socket_migration: SERVER_CONFIG.tcp.socket_migration,
         }
     }
 }
diff --git a/core/server/src/configs/displays.rs 
b/core/server/src/configs/displays.rs
index a3e64bc62..8973b356a 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -263,8 +263,8 @@ impl Display for TcpConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ enabled: {}, address: {}, ipv6: {}, tls: {}, socket: {} }}",
-            self.enabled, self.address, self.ipv6, self.tls, self.socket,
+            "{{ enabled: {}, address: {}, ipv6: {}, tls: {}, socket: {}, 
socket_migration: {} }}",
+            self.enabled, self.address, self.ipv6, self.tls, self.socket, 
self.socket_migration
         )
     }
 }
diff --git a/core/server/src/configs/tcp.rs b/core/server/src/configs/tcp.rs
index eeec77793..02ec8bbd5 100644
--- a/core/server/src/configs/tcp.rs
+++ b/core/server/src/configs/tcp.rs
@@ -28,6 +28,7 @@ pub struct TcpConfig {
     pub ipv6: bool,
     pub tls: TcpTlsConfig,
     pub socket: TcpSocketConfig,
+    pub socket_migration: bool,
 }
 
 #[derive(Debug, Deserialize, Serialize, Clone)]
diff --git a/core/server/src/http/http_server.rs 
b/core/server/src/http/http_server.rs
index 3a114aecf..23eefbdab 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -147,7 +147,10 @@ pub async fn start_http_server(
             protocol: TransportProtocol::Http,
             address,
         };
-        shard.handle_event(event).await.ok();
+
+        crate::shard::handlers::handle_event(&shard, event)
+            .await
+            .ok();
 
         let service = 
app.into_make_service_with_connect_info::<CompioSocketAddr>();
 
@@ -191,7 +194,10 @@ pub async fn start_http_server(
             protocol: TransportProtocol::Http,
             address,
         };
-        shard.handle_event(event).await.ok();
+
+        crate::shard::handlers::handle_event(&shard, event)
+            .await
+            .ok();
 
         let service = app.into_make_service_with_connect_info::<SocketAddr>();
         let handle = axum_server::Handle::new();
diff --git a/core/server/src/http/http_shard_wrapper.rs 
b/core/server/src/http/http_shard_wrapper.rs
index b67fa8981..f279b51be 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -18,8 +18,8 @@
 use std::rc::Rc;
 
 use iggy_common::{
-    Consumer, ConsumerOffsetInfo, Identifier, IggyError, IggyExpiry, 
Partitioning, Permissions,
-    Stats, UserId, UserStatus,
+    Consumer, ConsumerOffsetInfo, Identifier, IggyError, IggyExpiry, 
Partitioning,
+    PartitioningKind, Permissions, Stats, UserId, UserStatus,
 };
 use send_wrapper::SendWrapper;
 
@@ -28,6 +28,7 @@ use crate::shard::system::messages::PollingArgs;
 use crate::state::command::EntryCommand;
 use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use crate::streaming::topics;
 use crate::streaming::users::user::User;
 use crate::{shard::IggyShard, streaming::session::Session};
 
@@ -299,11 +300,39 @@ impl HttpSafeShard {
         partitioning: &Partitioning,
         batch: IggyMessagesBatchMut,
     ) -> Result<(), IggyError> {
+        self.shard().ensure_topic_exists(&stream_id, &topic_id)?;
+
+        let partition_id = self.shard().streams.with_topic_by_id(
+            &stream_id,
+            &topic_id,
+            |(root, auxilary, ..)| match partitioning.kind {
+                PartitioningKind::Balanced => {
+                    let upperbound = root.partitions().len();
+                    let pid = auxilary.get_next_partition_id(upperbound);
+                    Ok(pid)
+                }
+                PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+                    partitioning.value[..partitioning.length as usize]
+                        .try_into()
+                        .map_err(|_| IggyError::InvalidNumberEncoding)?,
+                ) as usize),
+                PartitioningKind::MessagesKey => {
+                    let upperbound = root.partitions().len();
+                    Ok(
+                        
topics::helpers::calculate_partition_id_by_messages_key_hash(
+                            upperbound,
+                            &partitioning.value,
+                        ),
+                    )
+                }
+            },
+        )?;
+
         let future = SendWrapper::new(self.shard().append_messages(
             user_id,
             stream_id,
             topic_id,
-            partitioning,
+            partition_id,
             batch,
         ));
         future.await
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 5ce51716d..23e7c515d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -296,6 +296,11 @@ fn main() -> Result<(), ServerError> {
         let metrics = Metrics::init();
 
         // TWELFTH DISCRETE LOADING STEP.
+        info!(
+            "Enable TCP socket migration across shards: {}.",
+            config.tcp.socket_migration
+        );
+
         info!("Starting {} shard(s)", shard_assignment.len());
         let (connections, shutdown_handles) = 
create_shard_connections(&shard_assignment);
         let shards_count = shard_assignment.len();
diff --git a/core/server/src/shard/communication.rs 
b/core/server/src/shard/communication.rs
index d7e73c5e4..59dc11183 100644
--- a/core/server/src/shard/communication.rs
+++ b/core/server/src/shard/communication.rs
@@ -137,7 +137,7 @@ impl IggyShard {
         }
     }
 
-    fn find_shard(&self, namespace: &IggyNamespace) -> 
Option<&ShardConnector<ShardFrame>> {
+    pub fn find_shard(&self, namespace: &IggyNamespace) -> 
Option<&ShardConnector<ShardFrame>> {
         self.shards_table.get(namespace).map(|shard_id| {
             self.shards
                 .iter()
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index 61596b643..087b7a54d 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -27,12 +27,19 @@ use crate::{
         },
     },
     streaming::{session::Session, traits::MainOps},
+    tcp::{
+        connection_handler::{ConnectionAction, handle_connection, 
handle_error},
+        tcp_listener::cleanup_connection,
+    },
 };
-use iggy_common::{Identifier, IggyError, TransportProtocol};
+use compio_net::TcpStream;
+use iggy_common::{Identifier, IggyError, SenderKind, TransportProtocol};
+use nix::sys::stat::SFlag;
+use std::os::fd::{FromRawFd, IntoRawFd};
 use tracing::info;
 
 pub(super) async fn handle_shard_message(
-    shard: &IggyShard,
+    shard: &Rc<IggyShard>,
     message: ShardMessage,
 ) -> Option<ShardResponse> {
     match message {
@@ -48,7 +55,7 @@ pub(super) async fn handle_shard_message(
 }
 
 async fn handle_request(
-    shard: &IggyShard,
+    shard: &Rc<IggyShard>,
     request: ShardRequest,
 ) -> Result<ShardResponse, IggyError> {
     let stream_id = request.stream_id;
@@ -312,10 +319,88 @@ async fn handle_request(
             shard.broadcast_event_to_all_shards(event).await?;
             Ok(ShardResponse::DeleteStreamResponse(stream))
         }
+        ShardRequestPayload::SocketTransfer {
+            fd,
+            from_shard,
+            client_id,
+            user_id,
+            address,
+            initial_data,
+        } => {
+            info!(
+                "Received socket transfer msg, fd: {fd:?}, from_shard: 
{from_shard}, address: {address}"
+            );
+
+            // Safety: The fd already != 1.
+            let stat = nix::sys::stat::fstat(&fd)
+                .map_err(|e| IggyError::IoError(format!("Invalid fd: {}", 
e)))?;
+
+            if 
!SFlag::from_bits_truncate(stat.st_mode).contains(SFlag::S_IFSOCK) {
+                return Err(IggyError::IoError(format!("fd {:?} is not a 
socket", fd)));
+            }
+
+            // restore TcpStream from fd
+            let tcp_stream = unsafe { TcpStream::from_raw_fd(fd.into_raw_fd()) 
};
+            let session = shard.add_client(&address, TransportProtocol::Tcp);
+            session.set_user_id(user_id);
+            session.set_migrated();
+
+            let mut sender = SenderKind::get_tcp_sender(tcp_stream);
+            let conn_stop_receiver = 
shard.task_registry.add_connection(session.client_id);
+            let shard_for_conn = shard.clone();
+            let registry = shard.task_registry.clone();
+            let registry_clone = registry.clone();
+
+            let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id);
+            let batch = shard.maybe_encrypt_messages(initial_data)?;
+            let messages_count = batch.count();
+
+            shard
+                .streams
+                .append_messages(&shard.config.system, &shard.task_registry, 
&ns, batch)
+                .await?;
+
+            shard.metrics.increment_messages(messages_count as u64);
+
+            sender.send_empty_ok_response().await?;
+
+            registry.spawn_connection(async move {
+                match handle_connection(&session, &mut sender, 
&shard_for_conn, conn_stop_receiver)
+                    .await
+                {
+                    Ok(ConnectionAction::Migrated { to_shard }) => {
+                        info!("Migrated to shard {to_shard}, ignore cleanup 
connection");
+                    }
+                    Ok(ConnectionAction::Finished) => {
+                        cleanup_connection(
+                            &mut sender,
+                            client_id,
+                            address,
+                            &registry_clone,
+                            &shard_for_conn,
+                        )
+                        .await;
+                    }
+                    Err(err) => {
+                        handle_error(err);
+                        cleanup_connection(
+                            &mut sender,
+                            client_id,
+                            address,
+                            &registry_clone,
+                            &shard_for_conn,
+                        )
+                        .await;
+                    }
+                }
+            });
+
+            Ok(ShardResponse::SocketTransferResponse)
+        }
     }
 }
 
-pub(crate) async fn handle_event(shard: &IggyShard, event: ShardEvent) -> 
Result<(), IggyError> {
+pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> 
Result<(), IggyError> {
     match event {
         ShardEvent::DeletedPartitions {
             stream_id,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 32e07d76d..f48279362 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -24,7 +24,7 @@ pub mod tasks;
 pub mod transmission;
 
 mod communication;
-mod handlers;
+pub mod handlers;
 
 // Re-export for backwards compatibility
 pub use communication::calculate_shard_assignment;
@@ -34,13 +34,7 @@ use crate::{
     configs::server::ServerConfig,
     io::fs_locks::FsLocks,
     shard::{
-        namespace::IggyNamespace,
-        task_registry::TaskRegistry,
-        transmission::{
-            event::ShardEvent,
-            frame::{ShardFrame, ShardResponse},
-            message::ShardMessage,
-        },
+        namespace::IggyNamespace, task_registry::TaskRegistry, 
transmission::frame::ShardFrame,
     },
     slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
     state::file::FileState,
@@ -292,14 +286,6 @@ impl IggyShard {
         self.shards.len() as u32
     }
 
-    pub async fn handle_shard_message(&self, message: ShardMessage) -> 
Option<ShardResponse> {
-        handlers::handle_shard_message(self, message).await
-    }
-
-    pub(crate) async fn handle_event(&self, event: ShardEvent) -> Result<(), 
IggyError> {
-        handlers::handle_event(self, event).await
-    }
-
     pub fn ensure_authenticated(&self, session: &Session) -> Result<(), 
IggyError> {
         if !session.is_active() {
             error!("{COMPONENT} - session is inactive, session: {session}");
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index d10fd74a9..bbae65bbe 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -31,7 +31,7 @@ use err_trail::ErrContext;
 use iggy_common::PooledBuffer;
 use iggy_common::{
     BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier, IggyError,
-    Partitioning, PartitioningKind, PollingKind, PollingStrategy,
+    PollingKind, PollingStrategy,
 };
 use std::sync::atomic::Ordering;
 use tracing::error;
@@ -42,7 +42,7 @@ impl IggyShard {
         user_id: u32,
         stream_id: Identifier,
         topic_id: Identifier,
-        partitioning: &Partitioning,
+        partition_id: usize,
         batch: IggyMessagesBatchMut,
     ) -> Result<(), IggyError> {
         self.ensure_topic_exists(&stream_id, &topic_id)?;
@@ -71,34 +71,9 @@ impl IggyShard {
             return Ok(());
         }
 
-        let partition_id =
-            self.streams
-                .with_topic_by_id(
-                    &stream_id,
-                    &topic_id,
-                    |(root, auxilary, ..)| match partitioning.kind {
-                        PartitioningKind::Balanced => {
-                            let upperbound = root.partitions().len();
-                            Ok(auxilary.get_next_partition_id(upperbound))
-                        }
-                        PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
-                            partitioning.value[..partitioning.length as usize]
-                                .try_into()
-                                .map_err(|_| 
IggyError::InvalidNumberEncoding)?,
-                        ) as usize),
-                        PartitioningKind::MessagesKey => {
-                            let upperbound = root.partitions().len();
-                            Ok(
-                                
topics::helpers::calculate_partition_id_by_messages_key_hash(
-                                    upperbound,
-                                    &partitioning.value,
-                                ),
-                            )
-                        }
-                    },
-                )?;
-
         self.ensure_partition_exists(&stream_id, &topic_id, partition_id)?;
+
+        // TODO(tungtose): DRY this code
         let namespace = IggyNamespace::new(numeric_stream_id, 
numeric_topic_id, partition_id);
         let payload = ShardRequestPayload::SendMessages { batch };
         let request = ShardRequest::new(stream_id.clone(), topic_id.clone(), 
partition_id, payload);
diff --git a/core/server/src/shard/tasks/continuous/message_pump.rs 
b/core/server/src/shard/tasks/continuous/message_pump.rs
index a709630ec..97e1f341d 100644
--- a/core/server/src/shard/tasks/continuous/message_pump.rs
+++ b/core/server/src/shard/tasks/continuous/message_pump.rs
@@ -16,9 +16,9 @@
  * under the License.
  */
 
-use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
 use crate::shard::transmission::frame::ShardFrame;
+use crate::shard::{IggyShard, handlers::handle_shard_message};
 use futures::FutureExt;
 use std::rc::Rc;
 use tracing::{debug, info};
@@ -57,7 +57,7 @@ async fn message_pump(
                 match frame {
                     Ok(ShardFrame { message, response_sender }) => {
                         if let (Some(response), Some(tx)) =
-                            (shard.handle_shard_message(message).await, 
response_sender)
+                            (handle_shard_message(&shard, message).await, 
response_sender)
                         {
                              let _ = tx.send(response).await;
                         }
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index 0b2f980fc..7c0fd2578 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -42,6 +42,7 @@ pub enum ShardResponse {
     CreateUserResponse(User),
     DeletedUser(User),
     GetStatsResponse(Stats),
+    SocketTransferResponse,
     ErrorResponse(IggyError),
 }
 
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index 0f48b1bdb..fec8bf473 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -27,6 +27,8 @@ use iggy_common::{
     CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions, 
UserStatus,
 };
 
+use std::{net::SocketAddr, os::fd::OwnedFd};
+
 #[allow(clippy::large_enum_variant)]
 pub enum ShardSendRequestResult {
     // TODO: In the future we can add other variants, for example backpressure 
from the destination shard,
@@ -127,6 +129,14 @@ pub enum ShardRequestPayload {
     DeleteSegments {
         segments_count: u32,
     },
+    SocketTransfer {
+        fd: OwnedFd,
+        from_shard: u16,
+        client_id: u32,
+        user_id: u32,
+        address: SocketAddr,
+        initial_data: IggyMessagesBatchMut,
+    },
 }
 
 impl From<ShardRequest> for ShardMessage {
diff --git a/core/server/src/streaming/session.rs 
b/core/server/src/streaming/session.rs
index 1afbd724d..c319e2e12 100644
--- a/core/server/src/streaming/session.rs
+++ b/core/server/src/streaming/session.rs
@@ -28,6 +28,7 @@ pub struct Session {
     user_id: Cell<UserId>,
     active: Cell<bool>,
     pub ip_address: SocketAddr,
+    pub migrated: Cell<bool>,
 }
 
 impl Session {
@@ -36,6 +37,7 @@ impl Session {
             client_id,
             user_id: Cell::new(user_id),
             active: Cell::new(true),
+            migrated: Cell::new(false),
             ip_address,
         }
     }
@@ -60,6 +62,17 @@ impl Session {
         self.active.set(false);
     }
 
+    /// Returns true if this session has been migrated to another shard.
+    ///
+    /// Prevents socket ping-ponging between shards. Subsequent wrong-shard 
requests use message forwarding instead
+    pub fn is_migrated(&self) -> bool {
+        self.migrated.get()
+    }
+
+    pub fn set_migrated(&self) {
+        self.migrated.set(true)
+    }
+
     pub fn clear_user_id(&self) {
         self.set_user_id(u32::MAX);
     }
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 7f307bcef..d0bdc5e4f 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -32,12 +32,21 @@ use tracing::{debug, error, info};
 
 const INITIAL_BYTES_LENGTH: usize = 4;
 
+/// Connection lifecycle action after command handling.
+pub enum ConnectionAction {
+    /// Continue handling connection on current shard.
+    Finished,
+
+    /// Connection migrated to another shard, exit without cleanup.
+    Migrated { to_shard: u16 },
+}
+
 pub(crate) async fn handle_connection(
     session: &Session,
     sender: &mut SenderKind,
     shard: &Rc<IggyShard>,
     stop_receiver: Receiver<()>,
-) -> Result<(), ConnectionError> {
+) -> Result<ConnectionAction, ConnectionError> {
     let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
     let mut code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
     loop {
@@ -49,7 +58,7 @@ pub(crate) async fn handle_connection(
             _ = stop_receiver.recv().fuse() => {
                 info!("Connection stop signal received for session: {}", 
session);
                 let _ = 
sender.send_error_response(IggyError::Disconnected).await;
-                return Ok(());
+                return Ok(ConnectionAction::Finished);
             }
             result = read_future.fuse() => {
                 match result {
@@ -83,11 +92,20 @@ pub(crate) async fn handle_connection(
         debug!("Received a TCP command: {command}, payload size: {length}");
         let cmd_code = command.code();
         match command.handle(sender, length, session, shard).await {
-            Ok(_) => {
-                debug!(
-                    "Command {cmd_code} was handled successfully, session: 
{session}. TCP response was sent."
-                );
-            }
+            Ok(handler_result) => match handler_result {
+                command::HandlerResult::Finished => {
+                    debug!(
+                        "Command {cmd_code} was handled successfully, session: 
{session}. TCP response was sent."
+                    );
+                }
+                command::HandlerResult::Migrated { to_shard } => {
+                    info!(
+                        "Command {cmd_code} was transfer to shard {to_shard}, 
session: {session}. TCP response was sent."
+                    );
+
+                    return Ok(ConnectionAction::Migrated { to_shard });
+                }
+            },
             Err(error) => {
                 // Special handling for GetClusterMetadata when clustering is 
disabled
                 if cmd_code == GET_CLUSTER_METADATA_CODE
@@ -102,6 +120,7 @@ pub(crate) async fn handle_connection(
                     error!(
                         "Command with code {cmd_code} was not handled 
successfully, session: {session}, error: {error}."
                     );
+
                     if let IggyError::ClientNotFound(_) = error {
                         sender.send_error_response(error).await?;
                         debug!("TCP error response was sent to: {session}.");
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 85fedd50c..c68547301 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -19,9 +19,9 @@
 use crate::configs::tcp::TcpSocketConfig;
 
 use crate::shard::IggyShard;
-use crate::shard::task_registry::ShutdownToken;
+use crate::shard::task_registry::{ShutdownToken, TaskRegistry};
 use crate::shard::transmission::event::ShardEvent;
-use crate::tcp::connection_handler::{handle_connection, handle_error};
+use crate::tcp::connection_handler::{ConnectionAction, handle_connection, 
handle_error};
 use compio::net::{TcpListener, TcpOpts};
 use err_trail::ErrContext;
 use futures::FutureExt;
@@ -150,16 +150,17 @@ async fn accept_loop(
                         let registry = shard.task_registry.clone();
                         let registry_clone = registry.clone();
                         registry.spawn_connection(async move {
-                            if let Err(error) = handle_connection(&session, 
&mut sender, &shard_for_conn, conn_stop_receiver).await {
-                                handle_error(error);
-                            }
-
-                            registry_clone.remove_connection(&client_id);
-                            shard_for_conn.delete_client(session.client_id);
-                            if let Err(error) = sender.shutdown().await {
-                                error!("Failed to shutdown TCP stream for 
client: {}, address: {}. {}", client_id, address, error);
-                            } else {
-                                info!("Successfully closed TCP stream for 
client: {}, address: {}.", client_id, address);
+                            match handle_connection(&session, &mut sender, 
&shard_for_conn, conn_stop_receiver).await {
+                                Ok(ConnectionAction::Migrated { to_shard }) => 
{
+                                    info!("Migrated to shard {to_shard}, 
ignore cleanup connection");
+                                }
+                                Ok(ConnectionAction::Finished) => {
+                                   cleanup_connection(&mut sender, client_id, 
address, &registry_clone, &shard_for_conn).await;
+                                }
+                                Err(err) => {
+                                    handle_error(err);
+                                    cleanup_connection(&mut sender, client_id, 
address, &registry_clone, &shard_for_conn).await;
+                                },
                             }
                         });
                     }
@@ -170,3 +171,25 @@ async fn accept_loop(
     }
     Ok(())
 }
+
+pub async fn cleanup_connection(
+    sender: &mut SenderKind,
+    client_id: u32,
+    address: SocketAddr,
+    registry: &Rc<TaskRegistry>,
+    shard: &IggyShard,
+) {
+    registry.remove_connection(&client_id);
+    shard.delete_client(client_id);
+    if let Err(error) = sender.shutdown().await {
+        error!(
+            "Failed to shutdown for client {}, address {}: {}",
+            client_id, address, error
+        );
+    } else {
+        info!(
+            "Successfully closed for client {}, address {}",
+            client_id, address
+        );
+    }
+}
diff --git a/core/server/src/websocket/websocket_listener.rs 
b/core/server/src/websocket/websocket_listener.rs
index 773e5c9ea..7b691c61b 100644
--- a/core/server/src/websocket/websocket_listener.rs
+++ b/core/server/src/websocket/websocket_listener.rs
@@ -94,7 +94,9 @@ pub async fn start(
         }
     } else {
         // Non-shard0 just handles the event locally
-        shard.handle_event(event).await.ok();
+        crate::shard::handlers::handle_event(&shard, event)
+            .await
+            .ok();
     }
 
     let ws_config = config.to_tungstenite_config();
diff --git a/core/server/src/websocket/websocket_tls_listener.rs 
b/core/server/src/websocket/websocket_tls_listener.rs
index 2432d5a1f..3743162fb 100644
--- a/core/server/src/websocket/websocket_tls_listener.rs
+++ b/core/server/src/websocket/websocket_tls_listener.rs
@@ -97,7 +97,9 @@ pub async fn start(
         }
     } else {
         // Non-shard0 just handles the event locally
-        shard.handle_event(event).await.ok();
+        crate::shard::handlers::handle_event(&shard, event)
+            .await
+            .ok();
     }
 
     // Ensure rustls crypto provider is installed

Reply via email to