hubcio commented on code in PR #2219:
URL: https://github.com/apache/iggy/pull/2219#discussion_r2387548669
##########
core/server/src/quic/listener.rs:
##########
@@ -16,56 +16,74 @@
* under the License.
*/
-use std::rc::Rc;
-
use crate::binary::command::{ServerCommand, ServerCommandHandler};
use crate::binary::sender::SenderKind;
use crate::server_error::ConnectionError;
use crate::shard::IggyShard;
+use crate::shard::task_registry::task_registry;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use crate::{shard_debug, shard_info};
use anyhow::anyhow;
use compio_quic::{Connection, Endpoint, RecvStream, SendStream};
+use futures::FutureExt;
use iggy_common::IggyError;
use iggy_common::TransportProtocol;
+use std::rc::Rc;
use tracing::{error, info, trace};
const INITIAL_BYTES_LENGTH: usize = 4;
pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) -> Result<(),
IggyError> {
- info!("Starting QUIC listener for shard {}", shard.id);
-
- // Since the QUIC Endpoint is internally Arc-wrapped and can be shared,
- // we only need one worker per shard rather than multiple workers per
endpoint.
- // This avoids the N×workers multiplication when multiple shards are used.
- while let Some(incoming_conn) = endpoint.wait_incoming().await {
- let remote_addr = incoming_conn.remote_address();
- trace!("Incoming connection from client: {}", remote_addr);
- let shard = shard.clone();
-
- // Spawn each connection handler independently to maintain concurrency
- compio::runtime::spawn(async move {
- trace!("Accepting connection from {}", remote_addr);
- match incoming_conn.await {
- Ok(connection) => {
- trace!("Connection established from {}", remote_addr);
- if let Err(error) = handle_connection(connection,
shard).await {
- error!("QUIC connection from {} has failed: {error}",
remote_addr);
+ let shutdown = task_registry().shutdown_token();
+
+ loop {
+ let accept_future = endpoint.wait_incoming();
+
+ futures::select! {
+ _ = shutdown.wait().fuse() => {
+ shard_debug!(shard.id, "QUIC listener received shutdown
signal, no longer accepting connections");
+ break;
+ }
+ incoming_conn = accept_future.fuse() => {
+ match incoming_conn {
+ Some(incoming_conn) => {
+ let remote_addr = incoming_conn.remote_address();
+
+ if shard.is_shutting_down() {
+ shard_info!(shard.id, "Rejecting new QUIC
connection from {} during shutdown", remote_addr);
+ continue;
+ }
+
+ trace!("Incoming connection from client: {}",
remote_addr);
+ let shard_for_conn = shard.clone();
+
+ compio::runtime::spawn(async move {
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]