hubcio commented on code in PR #2476:
URL: https://github.com/apache/iggy/pull/2476#discussion_r2627560347
##########
core/server/src/streaming/utils/crypto.rs:
##########
@@ -32,6 +32,7 @@ pub fn hash_password(password: &str) -> String {
}
pub fn verify_password(password: &str, hash: &str) -> bool {
+ // tracing::error!("Checking password: {password}, hash: {hash}");
Review Comment:
remove this
##########
core/common/src/sender/mod.rs:
##########
@@ -114,6 +117,26 @@ impl SenderKind {
Self::WebSocketTls(stream)
}
+ pub fn take_and_migrate_tcp(&mut self) -> Option<RawFd> {
+ match self {
+ SenderKind::Tcp(tcp_sender) => {
+ let stream = tcp_sender.stream.take()?;
+ let poll_fd = stream.into_poll_fd().ok()?;
+
+ let raw_fd = poll_fd.as_fd();
+ let Ok(owned_fd) = nix::unistd::dup(raw_fd) else {
+ // TODO(tungtose): recover tcp stream?
+ error!("Failed to dup fd");
+ return None;
+ };
+
+ let raw_fd = owned_fd.into_raw_fd();
+ Some(raw_fd)
+ }
+ _ => None,
Review Comment:
add TODO: support TCP TLS
##########
core/common/src/sender/mod.rs:
##########
@@ -114,6 +117,26 @@ impl SenderKind {
Self::WebSocketTls(stream)
}
+ pub fn take_and_migrate_tcp(&mut self) -> Option<RawFd> {
+ match self {
+ SenderKind::Tcp(tcp_sender) => {
+ let stream = tcp_sender.stream.take()?;
+ let poll_fd = stream.into_poll_fd().ok()?;
+
+ let raw_fd = poll_fd.as_fd();
+ let Ok(owned_fd) = nix::unistd::dup(raw_fd) else {
+ // TODO(tungtose): recover tcp stream?
+ error!("Failed to dup fd");
+ return None;
+ };
+
+ let raw_fd = owned_fd.into_raw_fd();
Review Comment:
i was wondering, can we use OwnedFd instead of into_raw_fd? this way we
woudl have auto close if theres error later (raii)
##########
core/server/src/binary/handlers/messages/send_messages_handler.rs:
##########
@@ -107,19 +110,96 @@ impl ServerCommandHandler for SendMessages {
);
batch.validate()?;
+ shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
+
+ let numeric_stream_id = shard
+ .streams
+ .with_stream_by_id(&self.stream_id,
streams::helpers::get_stream_id());
+
+ let numeric_topic_id = shard.streams.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ topics::helpers::get_topic_id(),
+ );
+
+ // TODO(tungtose): dry this code && get partition_id below have a side
effect
+ let partition_id = shard.streams.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ |(root, auxilary, ..)| match self.partitioning.kind {
+ PartitioningKind::Balanced => {
+ let upperbound = root.partitions().len();
+ let pid = auxilary.get_next_partition_id(upperbound);
+ Ok(pid)
+ }
+ PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+ self.partitioning.value[..self.partitioning.length as
usize]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ ) as usize),
+ PartitioningKind::MessagesKey => {
+ let upperbound = root.partitions().len();
+ Ok(
+
topics::helpers::calculate_partition_id_by_messages_key_hash(
+ upperbound,
+ &self.partitioning.value,
+ ),
+ )
+ }
+ },
+ )?;
+
+ let namespace = IggyNamespace::new(numeric_stream_id,
numeric_topic_id, partition_id);
let user_id = session.get_user_id();
+ let balanced_partitioning = matches!(self.partitioning.kind,
PartitioningKind::Balanced);
+ let enabled_socket_migration = shard.config.tcp.socket_migration;
+
+ if enabled_socket_migration
+ && !(session.is_migrated() || balanced_partitioning)
+ && let Some(target_shard) = shard.find_shard(&namespace)
+ && target_shard.id != shard.id
+ {
+ debug!(
+ "TCP wrong shared detected: migrating from_shard {}, to_shard
{}",
+ shard.id, target_shard.id
+ );
+
+ if let Some(fd) = sender.take_and_migrate_tcp() {
+ let payload = ShardRequestPayload::SocketTransfer {
+ fd,
+ from_shard: shard.id,
+ client_id: session.client_id,
+ user_id,
+ address: session.ip_address,
+ initial_data: batch,
+ };
+
+ let request = ShardRequest::new(
+ self.stream_id.clone(),
+ self.topic_id.clone(),
+ partition_id,
+ payload,
+ );
+
+ let socket_transfer_msg = ShardMessage::Request(request);
+
+ shard
+ .send_request_to_shard_or_recoil(Some(&namespace),
socket_transfer_msg)
+ .await?;
Review Comment:
if send_request_to_shard_or_recoil fails, the fd is already extracted from
the TcpSender (stream is None), but no cleanup occurs,so fd leaks and the
connection is in an inconsistent state
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]