tungtose commented on code in PR #2476:
URL: https://github.com/apache/iggy/pull/2476#discussion_r2627806220
##########
core/server/src/binary/handlers/messages/send_messages_handler.rs:
##########
@@ -107,19 +110,96 @@ impl ServerCommandHandler for SendMessages {
);
batch.validate()?;
+ shard.ensure_topic_exists(&self.stream_id, &self.topic_id)?;
+
+ let numeric_stream_id = shard
+ .streams
+ .with_stream_by_id(&self.stream_id,
streams::helpers::get_stream_id());
+
+ let numeric_topic_id = shard.streams.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ topics::helpers::get_topic_id(),
+ );
+
+ // TODO(tungtose): dry this code && get partition_id below have a side
effect
+ let partition_id = shard.streams.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ |(root, auxilary, ..)| match self.partitioning.kind {
+ PartitioningKind::Balanced => {
+ let upperbound = root.partitions().len();
+ let pid = auxilary.get_next_partition_id(upperbound);
+ Ok(pid)
+ }
+ PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+ self.partitioning.value[..self.partitioning.length as
usize]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ ) as usize),
+ PartitioningKind::MessagesKey => {
+ let upperbound = root.partitions().len();
+ Ok(
+
topics::helpers::calculate_partition_id_by_messages_key_hash(
+ upperbound,
+ &self.partitioning.value,
+ ),
+ )
+ }
+ },
+ )?;
+
+ let namespace = IggyNamespace::new(numeric_stream_id,
numeric_topic_id, partition_id);
let user_id = session.get_user_id();
+ let balanced_partitioning = matches!(self.partitioning.kind,
PartitioningKind::Balanced);
+ let enabled_socket_migration = shard.config.tcp.socket_migration;
+
+ if enabled_socket_migration
+ && !(session.is_migrated() || balanced_partitioning)
+ && let Some(target_shard) = shard.find_shard(&namespace)
+ && target_shard.id != shard.id
+ {
+ debug!(
+ "TCP wrong shared detected: migrating from_shard {}, to_shard
{}",
+ shard.id, target_shard.id
+ );
+
+ if let Some(fd) = sender.take_and_migrate_tcp() {
+ let payload = ShardRequestPayload::SocketTransfer {
+ fd,
+ from_shard: shard.id,
+ client_id: session.client_id,
+ user_id,
+ address: session.ip_address,
+ initial_data: batch,
+ };
+
+ let request = ShardRequest::new(
+ self.stream_id.clone(),
+ self.topic_id.clone(),
+ partition_id,
+ payload,
+ );
+
+ let socket_transfer_msg = ShardMessage::Request(request);
+
+ shard
+ .send_request_to_shard_or_recoil(Some(&namespace),
socket_transfer_msg)
+ .await?;
Review Comment:
Is this good enough?
```
if let Err(e) = shard
.send_request_to_shard_or_recoil(Some(&namespace),
socket_transfer_msg)
.await
{
error!("tranfer socket to another shard failed, drop
connection. {e:?}");
return Ok(HandlerResult::Finished);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]