This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 96c3b1fb9 feat(server): socket migration across shard (#2476)
96c3b1fb9 is described below
commit 96c3b1fb9a7be6bf0c92ff0cb87dc387ed6a3d08
Author: tungtose <[email protected]>
AuthorDate: Sun Dec 21 18:04:57 2025 +0700
feat(server): socket migration across shard (#2476)
This PR addressing #2386
Enable TCP socket migration across shards. When a client sends messages
to the wrong shard, migrate the socket to the correct shard to eliminate
cross-shard forwarding overhead
---
Cargo.lock | 1 +
Cargo.toml | 1 +
core/common/Cargo.toml | 1 +
core/common/src/sender/mod.rs | 27 ++++-
core/common/src/sender/tcp_sender.rs | 49 ++++++---
core/configs/server.toml | 3 +
core/server/Cargo.toml | 2 +-
core/server/src/binary/command.rs | 11 +-
.../cluster/get_cluster_metadata_handler.rs | 8 +-
.../create_consumer_group_handler.rs | 8 +-
.../delete_consumer_group_handler.rs | 8 +-
.../consumer_groups/get_consumer_group_handler.rs | 12 ++-
.../consumer_groups/get_consumer_groups_handler.rs | 8 +-
.../consumer_groups/join_consumer_group_handler.rs | 8 +-
.../leave_consumer_group_handler.rs | 8 +-
.../delete_consumer_offset_handler.rs | 8 +-
.../get_consumer_offset_handler.rs | 12 ++-
.../store_consumer_offset_handler.rs | 8 +-
.../messages/flush_unsaved_buffer_handler.rs | 8 +-
.../handlers/messages/poll_messages_handler.rs | 8 +-
.../handlers/messages/send_messages_handler.rs | 111 ++++++++++++++++++---
.../partitions/create_partitions_handler.rs | 8 +-
.../partitions/delete_partitions_handler.rs | 8 +-
.../create_personal_access_token_handler.rs | 8 +-
.../delete_personal_access_token_handler.rs | 8 +-
.../get_personal_access_tokens_handler.rs | 8 +-
.../login_with_personal_access_token_handler.rs | 8 +-
.../handlers/segments/delete_segments_handler.rs | 8 +-
.../handlers/streams/create_stream_handler.rs | 8 +-
.../handlers/streams/delete_stream_handler.rs | 8 +-
.../binary/handlers/streams/get_stream_handler.rs | 12 ++-
.../binary/handlers/streams/get_streams_handler.rs | 8 +-
.../handlers/streams/purge_stream_handler.rs | 8 +-
.../handlers/streams/update_stream_handler.rs | 8 +-
.../binary/handlers/system/get_client_handler.rs | 12 ++-
.../binary/handlers/system/get_clients_handler.rs | 8 +-
.../src/binary/handlers/system/get_me_handler.rs | 8 +-
.../src/binary/handlers/system/get_snapshot.rs | 8 +-
.../binary/handlers/system/get_stats_handler.rs | 8 +-
.../src/binary/handlers/system/ping_handler.rs | 6 +-
.../binary/handlers/topics/create_topic_handler.rs | 8 +-
.../binary/handlers/topics/delete_topic_handler.rs | 8 +-
.../binary/handlers/topics/get_topic_handler.rs | 12 ++-
.../binary/handlers/topics/get_topics_handler.rs | 8 +-
.../binary/handlers/topics/purge_topic_handler.rs | 8 +-
.../binary/handlers/topics/update_topic_handler.rs | 8 +-
.../handlers/users/change_password_handler.rs | 8 +-
.../binary/handlers/users/create_user_handler.rs | 8 +-
.../binary/handlers/users/delete_user_handler.rs | 8 +-
.../src/binary/handlers/users/get_user_handler.rs | 12 ++-
.../src/binary/handlers/users/get_users_handler.rs | 8 +-
.../binary/handlers/users/login_user_handler.rs | 8 +-
.../binary/handlers/users/logout_user_handler.rs | 8 +-
.../handlers/users/update_permissions_handler.rs | 8 +-
.../binary/handlers/users/update_user_handler.rs | 8 +-
core/server/src/configs/defaults.rs | 1 +
core/server/src/configs/displays.rs | 4 +-
core/server/src/configs/tcp.rs | 1 +
core/server/src/http/http_server.rs | 10 +-
core/server/src/http/http_shard_wrapper.rs | 35 ++++++-
core/server/src/main.rs | 5 +
core/server/src/shard/communication.rs | 2 +-
core/server/src/shard/handlers.rs | 93 ++++++++++++++++-
core/server/src/shard/mod.rs | 18 +---
core/server/src/shard/system/messages.rs | 33 +-----
.../src/shard/tasks/continuous/message_pump.rs | 4 +-
core/server/src/shard/transmission/frame.rs | 1 +
core/server/src/shard/transmission/message.rs | 10 ++
core/server/src/streaming/session.rs | 13 +++
core/server/src/tcp/connection_handler.rs | 33 ++++--
core/server/src/tcp/tcp_listener.rs | 47 ++++++---
core/server/src/websocket/websocket_listener.rs | 4 +-
.../server/src/websocket/websocket_tls_listener.rs | 4 +-
73 files changed, 655 insertions(+), 259 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index adc4d5afe..0f671da6d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4657,6 +4657,7 @@ dependencies = [
"figment",
"human-repr",
"humantime",
+ "nix",
"once_cell",
"rcgen",
"rustls",
diff --git a/Cargo.toml b/Cargo.toml
index dd31ec539..2ccb4b1ea 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -145,6 +145,7 @@ lazy_static = "1.5.0"
log = "0.4.29"
mimalloc = "0.1"
mockall = "0.14.0"
+nix = { version = "0.30.1", features = ["fs", "resource"] }
nonzero_lit = "0.1.2"
once_cell = "1.21.3"
parquet = "=55.2.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index fbfe9b3a5..e935435ef 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -54,6 +54,7 @@ fast-async-mutex = { version = "0.6.7", optional = true }
figment = { workspace = true }
human-repr = { workspace = true }
humantime = { workspace = true }
+nix = { workspace = true }
once_cell = { workspace = true }
rcgen = "0.14.6"
rustls = { workspace = true }
diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs
index 28d73f87a..b1da6e254 100644
--- a/core/common/src/sender/mod.rs
+++ b/core/common/src/sender/mod.rs
@@ -37,7 +37,8 @@ use compio::net::TcpStream;
use compio_quic::{RecvStream, SendStream};
use compio_tls::TlsStream;
use std::future::Future;
-use tracing::debug;
+use std::os::fd::{AsFd, OwnedFd};
+use tracing::{debug, error};
macro_rules! forward_async_methods {
(
@@ -92,7 +93,9 @@ pub enum SenderKind {
impl SenderKind {
pub fn get_tcp_sender(stream: TcpStream) -> Self {
- Self::Tcp(TcpSender { stream })
+ Self::Tcp(TcpSender {
+ stream: Some(stream),
+ })
}
pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self {
@@ -114,6 +117,26 @@ impl SenderKind {
Self::WebSocketTls(stream)
}
+ pub fn take_and_migrate_tcp(&mut self) -> Option<OwnedFd> {
+ match self {
+ SenderKind::Tcp(tcp_sender) => {
+ let stream = tcp_sender.stream.take()?;
+ let poll_fd = stream.into_poll_fd().ok()?;
+
+ let raw_fd = poll_fd.as_fd();
+ let Ok(owned_fd) = nix::unistd::dup(raw_fd) else {
+ // TODO(tungtose): recover tcp stream?
+ error!("Failed to dup fd");
+ return None;
+ };
+
+ Some(owned_fd)
+ }
+ // TODO(tungtose): support TCP TLS
+ _ => None,
+ }
+ }
+
forward_async_methods! {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(),
IggyError>, B);
async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
diff --git a/core/common/src/sender/tcp_sender.rs
b/core/common/src/sender/tcp_sender.rs
index 974c4c013..3dc008bce 100644
--- a/core/common/src/sender/tcp_sender.rs
+++ b/core/common/src/sender/tcp_sender.rs
@@ -27,34 +27,50 @@ const COMPONENT: &str = "TCP";
#[derive(Debug)]
pub struct TcpSender {
- pub(crate) stream: TcpStream,
+ pub(crate) stream: Option<TcpStream>,
}
impl Sender for TcpSender {
async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(),
IggyError>, B) {
- super::read(&mut self.stream, buffer).await
+ match self.stream.as_mut() {
+ Some(stream) => super::read(stream, buffer).await,
+ None => (Err(IggyError::ConnectionClosed), buffer),
+ }
}
async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
- super::send_empty_ok_response(&mut self.stream).await
+ match self.stream.as_mut() {
+ Some(stream) => super::send_empty_ok_response(stream).await,
+ None => Err(IggyError::ConnectionClosed),
+ }
}
async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(),
IggyError> {
- super::send_ok_response(&mut self.stream, payload).await
+ match self.stream.as_mut() {
+ Some(stream) => super::send_ok_response(stream, payload).await,
+ None => Err(IggyError::ConnectionClosed),
+ }
}
async fn send_error_response(&mut self, error: IggyError) -> Result<(),
IggyError> {
- super::send_error_response(&mut self.stream, error).await
+ match self.stream.as_mut() {
+ Some(stream) => super::send_error_response(stream, error).await,
+ None => Err(IggyError::ConnectionClosed),
+ }
}
async fn shutdown(&mut self) -> Result<(), IggyError> {
- self.stream
- .shutdown()
- .await
- .with_error(|error| {
- format!("{COMPONENT} (error: {error}) - failed to shutdown TCP
stream")
- })
- .map_err(|e| IggyError::IoError(e.to_string()))
+ match self.stream.as_mut() {
+ Some(stream) => stream
+ .shutdown()
+ .await
+ .with_error(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to shutdown
TCP stream")
+ })
+ .map_err(|e| IggyError::IoError(e.to_string())),
+
+ None => Err(IggyError::ConnectionClosed),
+ }
}
async fn send_ok_response_vectored(
@@ -62,6 +78,13 @@ impl Sender for TcpSender {
length: &[u8],
slices: Vec<PooledBuffer>,
) -> Result<(), IggyError> {
- super::send_ok_response_vectored(&mut self.stream, length,
slices).await
+ if self.stream.is_none() {
+ tracing::error!("Tried to send but stream is None!");
+ }
+ match self.stream.as_mut() {
+ Some(stream) => super::send_ok_response_vectored(stream, length,
slices).await,
+
+ None => Err(IggyError::ConnectionClosed),
+ }
}
}
diff --git a/core/configs/server.toml b/core/configs/server.toml
index d795de0a9..cec71ddc7 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -151,6 +151,9 @@ enabled = true
# For example, "127.0.0.1:8090" listens on localhost only on port 8090.
address = "127.0.0.1:8090"
+# Enable TCP socket migration across shards.
+socket_migration = true
+
# Whether to use ipv4 or ipv6
ipv6 = false
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index d10f082f5..bbf15794a 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -78,7 +78,7 @@ lending-iterator = "0.1.7"
mimalloc = { workspace = true, optional = true }
mime_guess = { version = "2.0", optional = true }
moka = { version = "0.12.11", features = ["future"] }
-nix = { version = "0.30", features = ["fs", "resource"] }
+nix = { workspace = true }
opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] }
opentelemetry-otlp = { version = "0.31.0", features = [
diff --git a/core/server/src/binary/command.rs
b/core/server/src/binary/command.rs
index 1f9389f38..e10fe6ab8 100644
--- a/core/server/src/binary/command.rs
+++ b/core/server/src/binary/command.rs
@@ -121,6 +121,15 @@ define_server_command_enum! {
LeaveConsumerGroup(LeaveConsumerGroup), LEAVE_CONSUMER_GROUP_CODE,
LEAVE_CONSUMER_GROUP, true;
}
+/// Indicates whether a command handler completed normally or migrated the
connection.
+pub enum HandlerResult {
+ /// Command completed, connection stays on current shard.
+ Finished,
+
+ /// Connection was migrated to another shard. Source shard should exit
without cleanup.
+ Migrated { to_shard: u16 },
+}
+
#[enum_dispatch]
pub trait ServerCommandHandler {
/// Return the command code
@@ -134,7 +143,7 @@ pub trait ServerCommandHandler {
length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError>;
+ ) -> Result<HandlerResult, IggyError>;
}
pub trait BinaryServerCommand {
diff --git
a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
index 0de5a4ba1..af6c7a9a0 100644
--- a/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
+++ b/core/server/src/binary/handlers/cluster/get_cluster_metadata_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
use crate::streaming::session::Session;
@@ -40,14 +42,14 @@ impl ServerCommandHandler for GetClusterMetadata {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let cluster_metadata = shard.get_cluster_metadata(session)?;
let response = cluster_metadata.to_bytes();
sender.send_ok_response(&response).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 38cd1d5f5..0c6e9a763 100644
---
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::consumer_groups::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let cg = shard.create_consumer_group(
session,
@@ -86,7 +88,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
|(root, members)| mapper::map_consumer_group(root, members),
);
sender.send_ok_response(&response).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index e406cb038..1da98ac44 100644
---
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::consumer_groups::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let cg = shard.delete_consumer_group(session, &self.stream_id,
&self.topic_id, &self.group_id).with_error(|error| {
format!(
@@ -123,7 +125,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
)
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
index cae09dced..7156a4d53 100644
---
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetConsumerGroup {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
let exists = shard
@@ -48,7 +50,7 @@ impl ServerCommandHandler for GetConsumerGroup {
.is_ok();
if !exists {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
}
let numeric_topic_id = shard.streams.with_topic_by_id(
&self.stream_id,
@@ -65,7 +67,7 @@ impl ServerCommandHandler for GetConsumerGroup {
.is_ok();
if !has_permission {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
}
let consumer_group = shard.streams.with_consumer_group_by_id(
@@ -75,7 +77,7 @@ impl ServerCommandHandler for GetConsumerGroup {
|(root, members)| mapper::map_consumer_group(root, members),
);
sender.send_ok_response(&consumer_group).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
index 844f959db..d59f33c97 100644
---
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for GetConsumerGroups {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
@@ -69,7 +71,7 @@ impl ServerCommandHandler for GetConsumerGroups {
})
});
sender.send_ok_response(&consumer_groups).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index 4eb2b9196..ad68aa333 100644
---
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::consumer_groups::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for JoinConsumerGroup {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard
.join_consumer_group(
@@ -58,7 +60,7 @@ impl ServerCommandHandler for JoinConsumerGroup {
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index 9601d1b7c..a39ede547 100644
---
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -17,7 +17,9 @@
*/
use super::COMPONENT;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use iggy_common::SenderKind;
@@ -42,7 +44,7 @@ impl ServerCommandHandler for LeaveConsumerGroup {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard
@@ -60,7 +62,7 @@ impl ServerCommandHandler for LeaveConsumerGroup {
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index caf154f00..ee7aa269b 100644
---
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::consumer_offsets::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard
.delete_consumer_offset(
@@ -54,7 +56,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
self.topic_id, self.stream_id, self.partition_id, session
))?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
index cc79b2f52..29ee6f310 100644
---
a/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for GetConsumerOffset {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let Ok(offset) = shard
.get_consumer_offset(
@@ -52,17 +54,17 @@ impl ServerCommandHandler for GetConsumerOffset {
.await
else {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
};
let Some(offset) = offset else {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
};
let offset = mapper::map_consumer_offset(&offset);
sender.send_ok_response(&offset).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index b422c3370..cbc99df99 100644
---
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::consumer_offsets::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard
.store_consumer_offset(
@@ -57,7 +59,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
self.stream_id, self.topic_id, self.partition_id, self.offset,
session
))?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
index 85099ecfa..30c70cd47 100644
--- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
+++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::messages::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let user_id = session.get_user_id();
@@ -64,7 +66,7 @@ impl ServerCommandHandler for FlushUnsavedBuffer {
)
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 85abb0450..0839e3a55 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
use crate::shard::system::messages::PollingArgs;
@@ -53,7 +55,7 @@ impl ServerCommandHandler for PollMessages {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let PollMessages {
consumer,
@@ -111,7 +113,7 @@ impl ServerCommandHandler for PollMessages {
sender
.send_ok_response_vectored(&response_length_bytes, bufs)
.await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 017056042..1d2e5d9d0 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -16,20 +16,23 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
+use crate::binary::command::{BinaryServerCommand, HandlerResult,
ServerCommandHandler};
use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
use crate::streaming::session::Session;
+use crate::streaming::{streams, topics};
use anyhow::Result;
use compio::buf::{IntoInner as _, IoBuf};
-use iggy_common::INDEX_SIZE;
use iggy_common::Identifier;
use iggy_common::PooledBuffer;
use iggy_common::SenderKind;
use iggy_common::Sizeable;
+use iggy_common::{INDEX_SIZE, PartitioningKind};
use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
use std::rc::Rc;
-use tracing::instrument;
+use tracing::{debug, error, info, instrument};
impl ServerCommandHandler for SendMessages {
fn code(&self) -> u32 {
@@ -49,7 +52,7 @@ impl ServerCommandHandler for SendMessages {
length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
let total_payload_size = length as usize - std::mem::size_of::<u32>();
let metadata_len_field_size = std::mem::size_of::<u32>();
@@ -107,19 +110,103 @@ impl ServerCommandHandler for SendMessages {
);
batch.validate()?;
+ shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
+
+ let numeric_stream_id = shard
+ .streams
+ .with_stream_by_id(&self.stream_id,
streams::helpers::get_stream_id());
+
+ let numeric_topic_id = shard.streams.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ topics::helpers::get_topic_id(),
+ );
+
+ // TODO(tungtose): dry this code && get partition_id below have a side
effect
+ let partition_id = shard.streams.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ |(root, auxilary, ..)| match self.partitioning.kind {
+ PartitioningKind::Balanced => {
+ let upperbound = root.partitions().len();
+ let pid = auxilary.get_next_partition_id(upperbound);
+ Ok(pid)
+ }
+ PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+ self.partitioning.value[..self.partitioning.length as
usize]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ ) as usize),
+ PartitioningKind::MessagesKey => {
+ let upperbound = root.partitions().len();
+ Ok(
+
topics::helpers::calculate_partition_id_by_messages_key_hash(
+ upperbound,
+ &self.partitioning.value,
+ ),
+ )
+ }
+ },
+ )?;
+
+ let namespace = IggyNamespace::new(numeric_stream_id,
numeric_topic_id, partition_id);
let user_id = session.get_user_id();
+ let unsupport_socket_transfer = matches!(
+ self.partitioning.kind,
+ PartitioningKind::Balanced | PartitioningKind::MessagesKey
+ );
+ let enabled_socket_migration = shard.config.tcp.socket_migration;
+
+ if enabled_socket_migration
+ && !(session.is_migrated() || unsupport_socket_transfer)
+ && let Some(target_shard) = shard.find_shard(&namespace)
+ && target_shard.id != shard.id
+ {
+ debug!(
+ "TCP wrong shared detected: migrating from_shard {}, to_shard
{}",
+ shard.id, target_shard.id
+ );
+
+ if let Some(fd) = sender.take_and_migrate_tcp() {
+ let payload = ShardRequestPayload::SocketTransfer {
+ fd,
+ from_shard: shard.id,
+ client_id: session.client_id,
+ user_id,
+ address: session.ip_address,
+ initial_data: batch,
+ };
+
+ let request = ShardRequest::new(
+ self.stream_id.clone(),
+ self.topic_id.clone(),
+ partition_id,
+ payload,
+ );
+
+ let socket_transfer_msg = ShardMessage::Request(request);
+
+ if let Err(e) = shard
+ .send_request_to_shard_or_recoil(Some(&namespace),
socket_transfer_msg)
+ .await
+ {
+ error!("tranfer socket to another shard failed, drop
connection. {e:?}");
+ return Ok(HandlerResult::Finished);
+ }
+
+ info!("Sending socket transfer to shard {}", target_shard.id);
+ return Ok(HandlerResult::Migrated {
+ to_shard: target_shard.id,
+ });
+ }
+ }
+
shard
- .append_messages(
- user_id,
- self.stream_id,
- self.topic_id,
- &self.partitioning,
- batch,
- )
+ .append_messages(user_id, self.stream_id, self.topic_id,
partition_id, batch)
.await?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index a6792939b..0ead0c4ad 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::partitions::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for CreatePartitions {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
// Acquire partition lock to serialize filesystem operations
@@ -94,7 +96,7 @@ impl ServerCommandHandler for CreatePartitions {
)
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 63bca4e21..fe6740136 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::partitions::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -43,7 +45,7 @@ impl ServerCommandHandler for DeletePartitions {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
@@ -91,7 +93,7 @@ impl ServerCommandHandler for DeletePartitions {
)
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index 5a1f3b652..c52ddd4eb 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::personal_access_tokens::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let (personal_access_token, token) = shard
@@ -83,7 +85,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
)
})?;
sender.send_ok_response(&bytes).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index 0763f574d..9a588ccbe 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::personal_access_tokens::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let token_name = self.name.clone();
@@ -71,7 +73,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
"{COMPONENT} (error: {error}) - failed to apply delete
personal access token with name: {token_name}, session: {session}"
)})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
index ab453a6ce..3c5df6417 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/get_personal_access_tokens_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::personal_access_tokens::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let personal_access_tokens = shard
.get_personal_access_tokens(session)
@@ -49,7 +51,7 @@ impl ServerCommandHandler for GetPersonalAccessTokens {
})?;
let personal_access_tokens =
mapper::map_personal_access_tokens(personal_access_tokens);
sender.send_ok_response(&personal_access_tokens).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index 18cd634f6..921a813e4 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -18,7 +18,9 @@
use crate::shard::IggyShard;
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::personal_access_tokens::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let user = shard
.login_with_personal_access_token(&self.token, Some(session))
@@ -57,7 +59,7 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
})?;
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index b75c71a33..2d161d411 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::partitions::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
@@ -47,7 +49,7 @@ impl ServerCommandHandler for DeleteSegments {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
@@ -123,7 +125,7 @@ impl ServerCommandHandler for DeleteSegments {
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index cde50e3ce..e56f87e64 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::streams::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -50,7 +52,7 @@ impl ServerCommandHandler for CreateStream {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let request = ShardRequest {
@@ -136,7 +138,7 @@ impl ServerCommandHandler for CreateStream {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index 67d7add90..76bc9f8ca 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::streams::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
@@ -48,7 +50,7 @@ impl ServerCommandHandler for DeleteStream {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
let request = ShardRequest {
@@ -119,7 +121,7 @@ impl ServerCommandHandler for DeleteStream {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs
b/core/server/src/binary/handlers/streams/get_stream_handler.rs
index da10b3f41..8d6f3f8ce 100644
--- a/core/server/src/binary/handlers/streams/get_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -42,13 +44,13 @@ impl ServerCommandHandler for GetStream {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
let exists = shard.ensure_stream_exists(&self.stream_id).is_ok();
if !exists {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
}
let stream_id = shard
.streams
@@ -67,13 +69,13 @@ impl ServerCommandHandler for GetStream {
.is_ok();
if !has_permission {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
}
let response = shard
.streams
.with_components_by_id(stream_id, |(root, stats)|
mapper::map_stream(&root, &stats));
sender.send_ok_response(&response).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/streams/get_streams_handler.rs
b/core/server/src/binary/handlers/streams/get_streams_handler.rs
index a3305663b..06e7f28c9 100644
--- a/core/server/src/binary/handlers/streams/get_streams_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_streams_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::streams::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -42,7 +44,7 @@ impl ServerCommandHandler for GetStreams {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
shard
@@ -61,7 +63,7 @@ impl ServerCommandHandler for GetStreams {
mapper::map_streams(&roots, &stats)
});
sender.send_ok_response(&response).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index 861140bbf..b8d3cfc0c 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::streams::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -43,7 +45,7 @@ impl ServerCommandHandler for PurgeStream {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
@@ -64,7 +66,7 @@ impl ServerCommandHandler for PurgeStream {
format!("{COMPONENT} (error: {error}) - failed to apply purge
stream with id: {stream_id}, session: {session}")
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index eeaca94e3..c080412a7 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::streams::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -43,7 +45,7 @@ impl ServerCommandHandler for UpdateStream {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
shard
@@ -65,7 +67,7 @@ impl ServerCommandHandler for UpdateStream {
format!("{COMPONENT} (error: {error}) - failed to apply update
stream with id: {stream_id}, session: {session}")
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/system/get_client_handler.rs
b/core/server/src/binary/handlers/system/get_client_handler.rs
index bdcc8ccc2..b9e16aef3 100644
--- a/core/server/src/binary/handlers/system/get_client_handler.rs
+++ b/core/server/src/binary/handlers/system/get_client_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -38,23 +40,23 @@ impl ServerCommandHandler for GetClient {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let Ok(client) = shard.get_client(session, self.client_id) else {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
};
let Some(client) = client else {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
};
let bytes = mapper::map_client(&client);
sender.send_ok_response(&bytes).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/system/get_clients_handler.rs
b/core/server/src/binary/handlers/system/get_clients_handler.rs
index 1a1865641..232a03f6b 100644
--- a/core/server/src/binary/handlers/system/get_clients_handler.rs
+++ b/core/server/src/binary/handlers/system/get_clients_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::system::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetClients {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let clients = shard.get_clients(session).with_error(|error| {
@@ -48,7 +50,7 @@ impl ServerCommandHandler for GetClients {
})?;
let clients = mapper::map_clients(clients).await;
sender.send_ok_response(&clients).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/system/get_me_handler.rs
b/core/server/src/binary/handlers/system/get_me_handler.rs
index ba4bb3b79..11b35c5a2 100644
--- a/core/server/src/binary/handlers/system/get_me_handler.rs
+++ b/core/server/src/binary/handlers/system/get_me_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::system::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -39,7 +41,7 @@ impl ServerCommandHandler for GetMe {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
let Some(client) = shard
.get_client(session, session.client_id)
.with_error(|error| {
@@ -52,7 +54,7 @@ impl ServerCommandHandler for GetMe {
let bytes = mapper::map_client(&client);
sender.send_ok_response(&bytes).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/system/get_snapshot.rs
b/core/server/src/binary/handlers/system/get_snapshot.rs
index f352eacb7..cc2163b91 100644
--- a/core/server/src/binary/handlers/system/get_snapshot.rs
+++ b/core/server/src/binary/handlers/system/get_snapshot.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
use crate::streaming::session::Session;
@@ -38,7 +40,7 @@ impl ServerCommandHandler for GetSnapshot {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let snapshot = shard
@@ -46,7 +48,7 @@ impl ServerCommandHandler for GetSnapshot {
.await?;
let bytes = Bytes::copy_from_slice(&snapshot.0);
sender.send_ok_response(&bytes).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/system/get_stats_handler.rs
b/core/server/src/binary/handlers/system/get_stats_handler.rs
index a318a4c98..a655a377d 100644
--- a/core/server/src/binary/handlers/system/get_stats_handler.rs
+++ b/core/server/src/binary/handlers/system/get_stats_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::system::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -44,7 +46,7 @@ impl ServerCommandHandler for GetStats {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
// Route GetStats to shard0 only
@@ -82,7 +84,7 @@ impl ServerCommandHandler for GetStats {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/system/ping_handler.rs
b/core/server/src/binary/handlers/system/ping_handler.rs
index e1c1fbe7e..e22fa7ba8 100644
--- a/core/server/src/binary/handlers/system/ping_handler.rs
+++ b/core/server/src/binary/handlers/system/ping_handler.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
+use crate::binary::command::{BinaryServerCommand, HandlerResult,
ServerCommandHandler};
use crate::shard::IggyShard;
use crate::streaming::session::Session;
use anyhow::Result;
@@ -38,7 +38,7 @@ impl ServerCommandHandler for Ping {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
if let Some(mut client) =
shard.client_manager.try_get_client_mut(session.client_id) {
let now = IggyTimestamp::now();
@@ -47,7 +47,7 @@ impl ServerCommandHandler for Ping {
}
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 8168d873b..b5b5c96a2 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::topics::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -51,7 +53,7 @@ impl ServerCommandHandler for CreateTopic {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let request = ShardRequest {
@@ -192,7 +194,7 @@ impl ServerCommandHandler for CreateTopic {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index c25aa005b..ea3d21cdd 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::topics::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -50,7 +52,7 @@ impl ServerCommandHandler for DeleteTopic {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let request = ShardRequest {
@@ -132,7 +134,7 @@ impl ServerCommandHandler for DeleteTopic {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs
b/core/server/src/binary/handlers/topics/get_topic_handler.rs
index 14fcabc67..fcc42ac0b 100644
--- a/core/server/src/binary/handlers/topics/get_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -40,7 +42,7 @@ impl ServerCommandHandler for GetTopic {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
let exists = shard
@@ -48,7 +50,7 @@ impl ServerCommandHandler for GetTopic {
.is_ok();
if !exists {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
}
let numeric_stream_id = shard
@@ -69,7 +71,7 @@ impl ServerCommandHandler for GetTopic {
.is_ok();
if !has_permission {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
}
let response =
@@ -79,7 +81,7 @@ impl ServerCommandHandler for GetTopic {
mapper::map_topic(&root, &stats)
});
sender.send_ok_response(&response).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 37123afce..96c3f9325 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for GetTopics {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard.ensure_authenticated(session)?;
shard.ensure_stream_exists(&self.stream_id)?;
@@ -60,7 +62,7 @@ impl ServerCommandHandler for GetTopics {
})
});
sender.send_ok_response(&response).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index abc1651b4..2cea843ed 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::topics::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::shard::IggyShard;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for PurgeTopic {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let topic_id = self.topic_id.clone();
let stream_id = self.stream_id.clone();
@@ -72,7 +74,7 @@ impl ServerCommandHandler for PurgeTopic {
)
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index a553cefff..6dfa2281d 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::topics::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -48,7 +50,7 @@ impl ServerCommandHandler for UpdateTopic {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let request = ShardRequest {
@@ -165,7 +167,7 @@ impl ServerCommandHandler for UpdateTopic {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs
b/core/server/src/binary/handlers/users/change_password_handler.rs
index fb03a46eb..dc512e172 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for ChangePassword {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
info!("Changing password for user with ID: {}...", self.user_id);
@@ -91,7 +93,7 @@ impl ServerCommandHandler for ChangePassword {
)
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs
b/core/server/src/binary/handlers/users/create_user_handler.rs
index f4cdcec73..0caa873f1 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -50,7 +52,7 @@ impl ServerCommandHandler for CreateUser {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let request = ShardRequest {
@@ -167,7 +169,7 @@ impl ServerCommandHandler for CreateUser {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index abf74cb4a..55b0f103d 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -49,7 +51,7 @@ impl ServerCommandHandler for DeleteUser {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let request = ShardRequest {
@@ -122,7 +124,7 @@ impl ServerCommandHandler for DeleteUser {
},
}
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs
b/core/server/src/binary/handlers/users/get_user_handler.rs
index 3a10279a8..97322a340 100644
--- a/core/server/src/binary/handlers/users/get_user_handler.rs
+++ b/core/server/src/binary/handlers/users/get_user_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::shard::IggyShard;
@@ -39,20 +41,20 @@ impl ServerCommandHandler for GetUser {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let Ok(user) = shard.find_user(session, &self.user_id) else {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
};
let Some(user) = user else {
sender.send_empty_ok_response().await?;
- return Ok(());
+ return Ok(HandlerResult::Finished);
};
let bytes = mapper::map_user(&user);
sender.send_ok_response(&bytes).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/get_users_handler.rs
b/core/server/src/binary/handlers/users/get_users_handler.rs
index bd1a5b2d4..b67f85875 100644
--- a/core/server/src/binary/handlers/users/get_users_handler.rs
+++ b/core/server/src/binary/handlers/users/get_users_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -41,14 +43,14 @@ impl ServerCommandHandler for GetUsers {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let users = shard.get_users(session).await.with_error(|error| {
format!("{COMPONENT} (error: {error}) - failed to get users,
session: {session}")
})?;
let users = mapper::map_users(users);
sender.send_ok_response(&users).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 53bd4bceb..9c32126a5 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -16,7 +16,9 @@
* under the License.
*/
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
@@ -41,7 +43,7 @@ impl ServerCommandHandler for LoginUser {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
if shard.is_shutting_down() {
warn!("Rejecting login request during shutdown");
return Err(IggyError::Disconnected);
@@ -64,7 +66,7 @@ impl ServerCommandHandler for LoginUser {
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 850312f1e..92c1cccb7 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -43,7 +45,7 @@ impl ServerCommandHandler for LogoutUser {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
info!("Logging out user with ID: {}...", session.get_user_id());
shard.logout_user(session).with_error(|error| {
@@ -52,7 +54,7 @@ impl ServerCommandHandler for LogoutUser {
info!("Logged out user with ID: {}.", session.get_user_id());
session.clear_user_id();
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git
a/core/server/src/binary/handlers/users/update_permissions_handler.rs
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index bc07707ef..948ce8e41 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for UpdatePermissions {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
shard
@@ -68,7 +70,7 @@ impl ServerCommandHandler for UpdatePermissions {
)
.await?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs
b/core/server/src/binary/handlers/users/update_user_handler.rs
index c89878de2..f4e12a0f5 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -18,7 +18,9 @@
use std::rc::Rc;
-use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
+use crate::binary::command::{
+ BinaryServerCommand, HandlerResult, ServerCommand, ServerCommandHandler,
+};
use crate::binary::handlers::users::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
@@ -45,7 +47,7 @@ impl ServerCommandHandler for UpdateUser {
_length: u32,
session: &Session,
shard: &Rc<IggyShard>,
- ) -> Result<(), IggyError> {
+ ) -> Result<HandlerResult, IggyError> {
debug!("session: {session}, command: {self}");
let user =shard
@@ -82,7 +84,7 @@ impl ServerCommandHandler for UpdateUser {
)
})?;
sender.send_empty_ok_response().await?;
- Ok(())
+ Ok(HandlerResult::Finished)
}
}
diff --git a/core/server/src/configs/defaults.rs
b/core/server/src/configs/defaults.rs
index b2d810e1e..f6d9d83e5 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -129,6 +129,7 @@ impl Default for TcpConfig {
ipv6: SERVER_CONFIG.tcp.ipv_6,
tls: TcpTlsConfig::default(),
socket: TcpSocketConfig::default(),
+ socket_migration: SERVER_CONFIG.tcp.socket_migration,
}
}
}
diff --git a/core/server/src/configs/displays.rs
b/core/server/src/configs/displays.rs
index a3e64bc62..8973b356a 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -263,8 +263,8 @@ impl Display for TcpConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ enabled: {}, address: {}, ipv6: {}, tls: {}, socket: {} }}",
- self.enabled, self.address, self.ipv6, self.tls, self.socket,
+ "{{ enabled: {}, address: {}, ipv6: {}, tls: {}, socket: {},
socket_migration: {} }}",
+ self.enabled, self.address, self.ipv6, self.tls, self.socket,
self.socket_migration
)
}
}
diff --git a/core/server/src/configs/tcp.rs b/core/server/src/configs/tcp.rs
index eeec77793..02ec8bbd5 100644
--- a/core/server/src/configs/tcp.rs
+++ b/core/server/src/configs/tcp.rs
@@ -28,6 +28,7 @@ pub struct TcpConfig {
pub ipv6: bool,
pub tls: TcpTlsConfig,
pub socket: TcpSocketConfig,
+ pub socket_migration: bool,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
diff --git a/core/server/src/http/http_server.rs
b/core/server/src/http/http_server.rs
index 3a114aecf..23eefbdab 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -147,7 +147,10 @@ pub async fn start_http_server(
protocol: TransportProtocol::Http,
address,
};
- shard.handle_event(event).await.ok();
+
+ crate::shard::handlers::handle_event(&shard, event)
+ .await
+ .ok();
let service =
app.into_make_service_with_connect_info::<CompioSocketAddr>();
@@ -191,7 +194,10 @@ pub async fn start_http_server(
protocol: TransportProtocol::Http,
address,
};
- shard.handle_event(event).await.ok();
+
+ crate::shard::handlers::handle_event(&shard, event)
+ .await
+ .ok();
let service = app.into_make_service_with_connect_info::<SocketAddr>();
let handle = axum_server::Handle::new();
diff --git a/core/server/src/http/http_shard_wrapper.rs
b/core/server/src/http/http_shard_wrapper.rs
index b67fa8981..f279b51be 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -18,8 +18,8 @@
use std::rc::Rc;
use iggy_common::{
- Consumer, ConsumerOffsetInfo, Identifier, IggyError, IggyExpiry,
Partitioning, Permissions,
- Stats, UserId, UserStatus,
+ Consumer, ConsumerOffsetInfo, Identifier, IggyError, IggyExpiry,
Partitioning,
+ PartitioningKind, Permissions, Stats, UserId, UserStatus,
};
use send_wrapper::SendWrapper;
@@ -28,6 +28,7 @@ use crate::shard::system::messages::PollingArgs;
use crate::state::command::EntryCommand;
use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use crate::streaming::topics;
use crate::streaming::users::user::User;
use crate::{shard::IggyShard, streaming::session::Session};
@@ -299,11 +300,39 @@ impl HttpSafeShard {
partitioning: &Partitioning,
batch: IggyMessagesBatchMut,
) -> Result<(), IggyError> {
+ self.shard().ensure_topic_exists(&stream_id, &topic_id)?;
+
+ let partition_id = self.shard().streams.with_topic_by_id(
+ &stream_id,
+ &topic_id,
+ |(root, auxilary, ..)| match partitioning.kind {
+ PartitioningKind::Balanced => {
+ let upperbound = root.partitions().len();
+ let pid = auxilary.get_next_partition_id(upperbound);
+ Ok(pid)
+ }
+ PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+ partitioning.value[..partitioning.length as usize]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ ) as usize),
+ PartitioningKind::MessagesKey => {
+ let upperbound = root.partitions().len();
+ Ok(
+
topics::helpers::calculate_partition_id_by_messages_key_hash(
+ upperbound,
+ &partitioning.value,
+ ),
+ )
+ }
+ },
+ )?;
+
let future = SendWrapper::new(self.shard().append_messages(
user_id,
stream_id,
topic_id,
- partitioning,
+ partition_id,
batch,
));
future.await
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 5ce51716d..23e7c515d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -296,6 +296,11 @@ fn main() -> Result<(), ServerError> {
let metrics = Metrics::init();
// TWELFTH DISCRETE LOADING STEP.
+ info!(
+ "Enable TCP socket migration across shards: {}.",
+ config.tcp.socket_migration
+ );
+
info!("Starting {} shard(s)", shard_assignment.len());
let (connections, shutdown_handles) =
create_shard_connections(&shard_assignment);
let shards_count = shard_assignment.len();
diff --git a/core/server/src/shard/communication.rs
b/core/server/src/shard/communication.rs
index d7e73c5e4..59dc11183 100644
--- a/core/server/src/shard/communication.rs
+++ b/core/server/src/shard/communication.rs
@@ -137,7 +137,7 @@ impl IggyShard {
}
}
- fn find_shard(&self, namespace: &IggyNamespace) ->
Option<&ShardConnector<ShardFrame>> {
+ pub fn find_shard(&self, namespace: &IggyNamespace) ->
Option<&ShardConnector<ShardFrame>> {
self.shards_table.get(namespace).map(|shard_id| {
self.shards
.iter()
diff --git a/core/server/src/shard/handlers.rs
b/core/server/src/shard/handlers.rs
index 61596b643..087b7a54d 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -27,12 +27,19 @@ use crate::{
},
},
streaming::{session::Session, traits::MainOps},
+ tcp::{
+ connection_handler::{ConnectionAction, handle_connection,
handle_error},
+ tcp_listener::cleanup_connection,
+ },
};
-use iggy_common::{Identifier, IggyError, TransportProtocol};
+use compio_net::TcpStream;
+use iggy_common::{Identifier, IggyError, SenderKind, TransportProtocol};
+use nix::sys::stat::SFlag;
+use std::os::fd::{FromRawFd, IntoRawFd};
use tracing::info;
pub(super) async fn handle_shard_message(
- shard: &IggyShard,
+ shard: &Rc<IggyShard>,
message: ShardMessage,
) -> Option<ShardResponse> {
match message {
@@ -48,7 +55,7 @@ pub(super) async fn handle_shard_message(
}
async fn handle_request(
- shard: &IggyShard,
+ shard: &Rc<IggyShard>,
request: ShardRequest,
) -> Result<ShardResponse, IggyError> {
let stream_id = request.stream_id;
@@ -312,10 +319,88 @@ async fn handle_request(
shard.broadcast_event_to_all_shards(event).await?;
Ok(ShardResponse::DeleteStreamResponse(stream))
}
+ ShardRequestPayload::SocketTransfer {
+ fd,
+ from_shard,
+ client_id,
+ user_id,
+ address,
+ initial_data,
+ } => {
+ info!(
+ "Received socket transfer msg, fd: {fd:?}, from_shard:
{from_shard}, address: {address}"
+ );
+
+ // Safety: The fd already != 1.
+ let stat = nix::sys::stat::fstat(&fd)
+ .map_err(|e| IggyError::IoError(format!("Invalid fd: {}",
e)))?;
+
+ if
!SFlag::from_bits_truncate(stat.st_mode).contains(SFlag::S_IFSOCK) {
+ return Err(IggyError::IoError(format!("fd {:?} is not a
socket", fd)));
+ }
+
+ // restore TcpStream from fd
+ let tcp_stream = unsafe { TcpStream::from_raw_fd(fd.into_raw_fd())
};
+ let session = shard.add_client(&address, TransportProtocol::Tcp);
+ session.set_user_id(user_id);
+ session.set_migrated();
+
+ let mut sender = SenderKind::get_tcp_sender(tcp_stream);
+ let conn_stop_receiver =
shard.task_registry.add_connection(session.client_id);
+ let shard_for_conn = shard.clone();
+ let registry = shard.task_registry.clone();
+ let registry_clone = registry.clone();
+
+ let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id);
+ let batch = shard.maybe_encrypt_messages(initial_data)?;
+ let messages_count = batch.count();
+
+ shard
+ .streams
+ .append_messages(&shard.config.system, &shard.task_registry,
&ns, batch)
+ .await?;
+
+ shard.metrics.increment_messages(messages_count as u64);
+
+ sender.send_empty_ok_response().await?;
+
+ registry.spawn_connection(async move {
+ match handle_connection(&session, &mut sender,
&shard_for_conn, conn_stop_receiver)
+ .await
+ {
+ Ok(ConnectionAction::Migrated { to_shard }) => {
+ info!("Migrated to shard {to_shard}, ignore cleanup
connection");
+ }
+ Ok(ConnectionAction::Finished) => {
+ cleanup_connection(
+ &mut sender,
+ client_id,
+ address,
+ ®istry_clone,
+ &shard_for_conn,
+ )
+ .await;
+ }
+ Err(err) => {
+ handle_error(err);
+ cleanup_connection(
+ &mut sender,
+ client_id,
+ address,
+ ®istry_clone,
+ &shard_for_conn,
+ )
+ .await;
+ }
+ }
+ });
+
+ Ok(ShardResponse::SocketTransferResponse)
+ }
}
}
-pub(crate) async fn handle_event(shard: &IggyShard, event: ShardEvent) ->
Result<(), IggyError> {
+pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) ->
Result<(), IggyError> {
match event {
ShardEvent::DeletedPartitions {
stream_id,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 32e07d76d..f48279362 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -24,7 +24,7 @@ pub mod tasks;
pub mod transmission;
mod communication;
-mod handlers;
+pub mod handlers;
// Re-export for backwards compatibility
pub use communication::calculate_shard_assignment;
@@ -34,13 +34,7 @@ use crate::{
configs::server::ServerConfig,
io::fs_locks::FsLocks,
shard::{
- namespace::IggyNamespace,
- task_registry::TaskRegistry,
- transmission::{
- event::ShardEvent,
- frame::{ShardFrame, ShardResponse},
- message::ShardMessage,
- },
+ namespace::IggyNamespace, task_registry::TaskRegistry,
transmission::frame::ShardFrame,
},
slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
state::file::FileState,
@@ -292,14 +286,6 @@ impl IggyShard {
self.shards.len() as u32
}
- pub async fn handle_shard_message(&self, message: ShardMessage) ->
Option<ShardResponse> {
- handlers::handle_shard_message(self, message).await
- }
-
- pub(crate) async fn handle_event(&self, event: ShardEvent) -> Result<(),
IggyError> {
- handlers::handle_event(self, event).await
- }
-
pub fn ensure_authenticated(&self, session: &Session) -> Result<(),
IggyError> {
if !session.is_active() {
error!("{COMPONENT} - session is inactive, session: {session}");
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index d10fd74a9..bbae65bbe 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -31,7 +31,7 @@ use err_trail::ErrContext;
use iggy_common::PooledBuffer;
use iggy_common::{
BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE,
Identifier, IggyError,
- Partitioning, PartitioningKind, PollingKind, PollingStrategy,
+ PollingKind, PollingStrategy,
};
use std::sync::atomic::Ordering;
use tracing::error;
@@ -42,7 +42,7 @@ impl IggyShard {
user_id: u32,
stream_id: Identifier,
topic_id: Identifier,
- partitioning: &Partitioning,
+ partition_id: usize,
batch: IggyMessagesBatchMut,
) -> Result<(), IggyError> {
self.ensure_topic_exists(&stream_id, &topic_id)?;
@@ -71,34 +71,9 @@ impl IggyShard {
return Ok(());
}
- let partition_id =
- self.streams
- .with_topic_by_id(
- &stream_id,
- &topic_id,
- |(root, auxilary, ..)| match partitioning.kind {
- PartitioningKind::Balanced => {
- let upperbound = root.partitions().len();
- Ok(auxilary.get_next_partition_id(upperbound))
- }
- PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
- partitioning.value[..partitioning.length as usize]
- .try_into()
- .map_err(|_|
IggyError::InvalidNumberEncoding)?,
- ) as usize),
- PartitioningKind::MessagesKey => {
- let upperbound = root.partitions().len();
- Ok(
-
topics::helpers::calculate_partition_id_by_messages_key_hash(
- upperbound,
- &partitioning.value,
- ),
- )
- }
- },
- )?;
-
self.ensure_partition_exists(&stream_id, &topic_id, partition_id)?;
+
+ // TODO(tungtose): DRY this code
let namespace = IggyNamespace::new(numeric_stream_id,
numeric_topic_id, partition_id);
let payload = ShardRequestPayload::SendMessages { batch };
let request = ShardRequest::new(stream_id.clone(), topic_id.clone(),
partition_id, payload);
diff --git a/core/server/src/shard/tasks/continuous/message_pump.rs
b/core/server/src/shard/tasks/continuous/message_pump.rs
index a709630ec..97e1f341d 100644
--- a/core/server/src/shard/tasks/continuous/message_pump.rs
+++ b/core/server/src/shard/tasks/continuous/message_pump.rs
@@ -16,9 +16,9 @@
* under the License.
*/
-use crate::shard::IggyShard;
use crate::shard::task_registry::ShutdownToken;
use crate::shard::transmission::frame::ShardFrame;
+use crate::shard::{IggyShard, handlers::handle_shard_message};
use futures::FutureExt;
use std::rc::Rc;
use tracing::{debug, info};
@@ -57,7 +57,7 @@ async fn message_pump(
match frame {
Ok(ShardFrame { message, response_sender }) => {
if let (Some(response), Some(tx)) =
- (shard.handle_shard_message(message).await,
response_sender)
+ (handle_shard_message(&shard, message).await,
response_sender)
{
let _ = tx.send(response).await;
}
diff --git a/core/server/src/shard/transmission/frame.rs
b/core/server/src/shard/transmission/frame.rs
index 0b2f980fc..7c0fd2578 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -42,6 +42,7 @@ pub enum ShardResponse {
CreateUserResponse(User),
DeletedUser(User),
GetStatsResponse(Stats),
+ SocketTransferResponse,
ErrorResponse(IggyError),
}
diff --git a/core/server/src/shard/transmission/message.rs
b/core/server/src/shard/transmission/message.rs
index 0f48b1bdb..fec8bf473 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -27,6 +27,8 @@ use iggy_common::{
CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions,
UserStatus,
};
+use std::{net::SocketAddr, os::fd::OwnedFd};
+
#[allow(clippy::large_enum_variant)]
pub enum ShardSendRequestResult {
// TODO: In the future we can add other variants, for example backpressure
from the destination shard,
@@ -127,6 +129,14 @@ pub enum ShardRequestPayload {
DeleteSegments {
segments_count: u32,
},
+ SocketTransfer {
+ fd: OwnedFd,
+ from_shard: u16,
+ client_id: u32,
+ user_id: u32,
+ address: SocketAddr,
+ initial_data: IggyMessagesBatchMut,
+ },
}
impl From<ShardRequest> for ShardMessage {
diff --git a/core/server/src/streaming/session.rs
b/core/server/src/streaming/session.rs
index 1afbd724d..c319e2e12 100644
--- a/core/server/src/streaming/session.rs
+++ b/core/server/src/streaming/session.rs
@@ -28,6 +28,7 @@ pub struct Session {
user_id: Cell<UserId>,
active: Cell<bool>,
pub ip_address: SocketAddr,
+ pub migrated: Cell<bool>,
}
impl Session {
@@ -36,6 +37,7 @@ impl Session {
client_id,
user_id: Cell::new(user_id),
active: Cell::new(true),
+ migrated: Cell::new(false),
ip_address,
}
}
@@ -60,6 +62,17 @@ impl Session {
self.active.set(false);
}
+ /// Returns true if this session has been migrated to another shard.
+ ///
+ /// Prevents socket ping-ponging between shards. Subsequent wrong-shard
requests use message forwarding instead
+ pub fn is_migrated(&self) -> bool {
+ self.migrated.get()
+ }
+
+ pub fn set_migrated(&self) {
+ self.migrated.set(true)
+ }
+
pub fn clear_user_id(&self) {
self.set_user_id(u32::MAX);
}
diff --git a/core/server/src/tcp/connection_handler.rs
b/core/server/src/tcp/connection_handler.rs
index 7f307bcef..d0bdc5e4f 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -32,12 +32,21 @@ use tracing::{debug, error, info};
const INITIAL_BYTES_LENGTH: usize = 4;
+/// Connection lifecycle action after command handling.
+pub enum ConnectionAction {
+ /// Continue handling connection on current shard.
+ Finished,
+
+ /// Connection migrated to another shard, exit without cleanup.
+ Migrated { to_shard: u16 },
+}
+
pub(crate) async fn handle_connection(
session: &Session,
sender: &mut SenderKind,
shard: &Rc<IggyShard>,
stop_receiver: Receiver<()>,
-) -> Result<(), ConnectionError> {
+) -> Result<ConnectionAction, ConnectionError> {
let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
let mut code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
loop {
@@ -49,7 +58,7 @@ pub(crate) async fn handle_connection(
_ = stop_receiver.recv().fuse() => {
info!("Connection stop signal received for session: {}",
session);
let _ =
sender.send_error_response(IggyError::Disconnected).await;
- return Ok(());
+ return Ok(ConnectionAction::Finished);
}
result = read_future.fuse() => {
match result {
@@ -83,11 +92,20 @@ pub(crate) async fn handle_connection(
debug!("Received a TCP command: {command}, payload size: {length}");
let cmd_code = command.code();
match command.handle(sender, length, session, shard).await {
- Ok(_) => {
- debug!(
- "Command {cmd_code} was handled successfully, session:
{session}. TCP response was sent."
- );
- }
+ Ok(handler_result) => match handler_result {
+ command::HandlerResult::Finished => {
+ debug!(
+ "Command {cmd_code} was handled successfully, session:
{session}. TCP response was sent."
+ );
+ }
+ command::HandlerResult::Migrated { to_shard } => {
+ info!(
+ "Command {cmd_code} was transfer to shard {to_shard},
session: {session}. TCP response was sent."
+ );
+
+ return Ok(ConnectionAction::Migrated { to_shard });
+ }
+ },
Err(error) => {
// Special handling for GetClusterMetadata when clustering is
disabled
if cmd_code == GET_CLUSTER_METADATA_CODE
@@ -102,6 +120,7 @@ pub(crate) async fn handle_connection(
error!(
"Command with code {cmd_code} was not handled
successfully, session: {session}, error: {error}."
);
+
if let IggyError::ClientNotFound(_) = error {
sender.send_error_response(error).await?;
debug!("TCP error response was sent to: {session}.");
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index 85fedd50c..c68547301 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -19,9 +19,9 @@
use crate::configs::tcp::TcpSocketConfig;
use crate::shard::IggyShard;
-use crate::shard::task_registry::ShutdownToken;
+use crate::shard::task_registry::{ShutdownToken, TaskRegistry};
use crate::shard::transmission::event::ShardEvent;
-use crate::tcp::connection_handler::{handle_connection, handle_error};
+use crate::tcp::connection_handler::{ConnectionAction, handle_connection,
handle_error};
use compio::net::{TcpListener, TcpOpts};
use err_trail::ErrContext;
use futures::FutureExt;
@@ -150,16 +150,17 @@ async fn accept_loop(
let registry = shard.task_registry.clone();
let registry_clone = registry.clone();
registry.spawn_connection(async move {
- if let Err(error) = handle_connection(&session,
&mut sender, &shard_for_conn, conn_stop_receiver).await {
- handle_error(error);
- }
-
- registry_clone.remove_connection(&client_id);
- shard_for_conn.delete_client(session.client_id);
- if let Err(error) = sender.shutdown().await {
- error!("Failed to shutdown TCP stream for
client: {}, address: {}. {}", client_id, address, error);
- } else {
- info!("Successfully closed TCP stream for
client: {}, address: {}.", client_id, address);
+ match handle_connection(&session, &mut sender,
&shard_for_conn, conn_stop_receiver).await {
+ Ok(ConnectionAction::Migrated { to_shard }) =>
{
+ info!("Migrated to shard {to_shard},
ignore cleanup connection");
+ }
+ Ok(ConnectionAction::Finished) => {
+ cleanup_connection(&mut sender, client_id,
address, ®istry_clone, &shard_for_conn).await;
+ }
+ Err(err) => {
+ handle_error(err);
+ cleanup_connection(&mut sender, client_id,
address, ®istry_clone, &shard_for_conn).await;
+ },
}
});
}
@@ -170,3 +171,25 @@ async fn accept_loop(
}
Ok(())
}
+
+pub async fn cleanup_connection(
+ sender: &mut SenderKind,
+ client_id: u32,
+ address: SocketAddr,
+ registry: &Rc<TaskRegistry>,
+ shard: &IggyShard,
+) {
+ registry.remove_connection(&client_id);
+ shard.delete_client(client_id);
+ if let Err(error) = sender.shutdown().await {
+ error!(
+ "Failed to shutdown for client {}, address {}: {}",
+ client_id, address, error
+ );
+ } else {
+ info!(
+ "Successfully closed for client {}, address {}",
+ client_id, address
+ );
+ }
+}
diff --git a/core/server/src/websocket/websocket_listener.rs
b/core/server/src/websocket/websocket_listener.rs
index 773e5c9ea..7b691c61b 100644
--- a/core/server/src/websocket/websocket_listener.rs
+++ b/core/server/src/websocket/websocket_listener.rs
@@ -94,7 +94,9 @@ pub async fn start(
}
} else {
// Non-shard0 just handles the event locally
- shard.handle_event(event).await.ok();
+ crate::shard::handlers::handle_event(&shard, event)
+ .await
+ .ok();
}
let ws_config = config.to_tungstenite_config();
diff --git a/core/server/src/websocket/websocket_tls_listener.rs
b/core/server/src/websocket/websocket_tls_listener.rs
index 2432d5a1f..3743162fb 100644
--- a/core/server/src/websocket/websocket_tls_listener.rs
+++ b/core/server/src/websocket/websocket_tls_listener.rs
@@ -97,7 +97,9 @@ pub async fn start(
}
} else {
// Non-shard0 just handles the event locally
- shard.handle_event(event).await.ok();
+ crate::shard::handlers::handle_event(&shard, event)
+ .await
+ .ok();
}
// Ensure rustls crypto provider is installed