numinnex commented on code in PR #2219:
URL: https://github.com/apache/iggy/pull/2219#discussion_r2387118965
##########
core/server/src/quic/listener.rs:
##########
@@ -16,56 +16,74 @@
* under the License.
*/
-use std::rc::Rc;
-
use crate::binary::command::{ServerCommand, ServerCommandHandler};
use crate::binary::sender::SenderKind;
use crate::server_error::ConnectionError;
use crate::shard::IggyShard;
+use crate::shard::task_registry::task_registry;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use crate::{shard_debug, shard_info};
use anyhow::anyhow;
use compio_quic::{Connection, Endpoint, RecvStream, SendStream};
+use futures::FutureExt;
use iggy_common::IggyError;
use iggy_common::TransportProtocol;
+use std::rc::Rc;
use tracing::{error, info, trace};
const INITIAL_BYTES_LENGTH: usize = 4;
pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) -> Result<(),
IggyError> {
- info!("Starting QUIC listener for shard {}", shard.id);
-
- // Since the QUIC Endpoint is internally Arc-wrapped and can be shared,
- // we only need one worker per shard rather than multiple workers per
endpoint.
- // This avoids the N×workers multiplication when multiple shards are used.
- while let Some(incoming_conn) = endpoint.wait_incoming().await {
- let remote_addr = incoming_conn.remote_address();
- trace!("Incoming connection from client: {}", remote_addr);
- let shard = shard.clone();
-
- // Spawn each connection handler independently to maintain concurrency
- compio::runtime::spawn(async move {
- trace!("Accepting connection from {}", remote_addr);
- match incoming_conn.await {
- Ok(connection) => {
- trace!("Connection established from {}", remote_addr);
- if let Err(error) = handle_connection(connection,
shard).await {
- error!("QUIC connection from {} has failed: {error}",
remote_addr);
+ let shutdown = task_registry().shutdown_token();
+
+ loop {
+ let accept_future = endpoint.wait_incoming();
+
+ futures::select! {
+ _ = shutdown.wait().fuse() => {
+ shard_debug!(shard.id, "QUIC listener received shutdown
signal, no longer accepting connections");
+ break;
+ }
+ incoming_conn = accept_future.fuse() => {
+ match incoming_conn {
+ Some(incoming_conn) => {
+ let remote_addr = incoming_conn.remote_address();
+
+ if shard.is_shutting_down() {
+ shard_info!(shard.id, "Rejecting new QUIC
connection from {} during shutdown", remote_addr);
+ continue;
+ }
+
+ trace!("Incoming connection from client: {}",
remote_addr);
+ let shard_for_conn = shard.clone();
+
+ compio::runtime::spawn(async move {
Review Comment:
Why we don't use the TaskRegistry for those tasks ?
##########
core/server/src/shard/task_registry/builders/continuous.rs:
##########
@@ -0,0 +1,118 @@
+use crate::shard::IggyShard;
+use crate::shard::task_registry::registry::TaskRegistry;
+use crate::shard::task_registry::specs::{
+ ContinuousTask, TaskCtx, TaskFuture, TaskMeta, TaskScope,
+};
+use futures::future::LocalBoxFuture;
+use iggy_common::IggyError;
+use std::{fmt::Debug, marker::PhantomData, rc::Rc};
+
+use crate::shard::task_registry::builders::{HasTask, NoTask};
+
+pub struct ContinuousBuilder<'a, S = NoTask> {
+ reg: &'a TaskRegistry,
+ name: &'static str,
+ scope: TaskScope,
+ critical: bool,
+ shard: Option<Rc<IggyShard>>,
+ run: Option<Box<dyn FnOnce(TaskCtx) -> LocalBoxFuture<'static, Result<(),
IggyError>>>>,
Review Comment:
since it's a builder for a task that stores only one `run` closure, we could
make the task generic, rather than using dynamic dispatch for the `FnOnce`
trait. Also I think we should use `AsyncFnOnce`, rather than `FnOnce` and don't
return the `LocalBoxFuture`, but the result instead.
##########
core/server/src/shard/task_registry/tls.rs:
##########
@@ -0,0 +1,104 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::registry::TaskRegistry;
+use std::cell::RefCell;
+use std::rc::Rc;
+
+thread_local! {
+ static REGISTRY: RefCell<Option<Rc<TaskRegistry>>> = RefCell::new(None);
+}
+
+pub fn init_task_registry(shard_id: u16) {
+ REGISTRY.with(|s| {
+ *s.borrow_mut() = Some(Rc::new(TaskRegistry::new(shard_id)));
+ });
+}
+
+pub fn task_registry() -> Rc<TaskRegistry> {
+ REGISTRY.with(|s| {
Review Comment:
there is a `with_borrow` method.
##########
core/server/src/main.rs:
##########
@@ -354,29 +353,51 @@ async fn main() -> Result<(), ServerError> {
}
let shutdown_handles_for_signal = shutdown_handles.clone();
- /*
- ::set_handler(move || {
- info!("Received shutdown signal (SIGTERM/SIGINT), initiating
graceful shutdown...");
-
- for (shard_id, stop_sender) in &shutdown_handles_for_signal {
- info!("Sending shutdown signal to shard {}", shard_id);
- if let Err(e) = stop_sender.send_blocking(()) {
- error!(
- "Failed to send shutdown signal to shard {}: {}",
- shard_id, e
- );
- }
+
+ ctrlc::set_handler(move || {
Review Comment:
compio has na `signal` module, can't we use it ?
https://compio.rs/docs/compio/signal
##########
core/server/src/http/http_server.rs:
##########
Review Comment:
some random tokio task, replace it with compio
##########
core/server/src/shard/task_registry/builders/continuous.rs:
##########
@@ -0,0 +1,118 @@
+use crate::shard::IggyShard;
+use crate::shard::task_registry::registry::TaskRegistry;
+use crate::shard::task_registry::specs::{
+ ContinuousTask, TaskCtx, TaskFuture, TaskMeta, TaskScope,
+};
+use futures::future::LocalBoxFuture;
+use iggy_common::IggyError;
+use std::{fmt::Debug, marker::PhantomData, rc::Rc};
+
+use crate::shard::task_registry::builders::{HasTask, NoTask};
+
+pub struct ContinuousBuilder<'a, S = NoTask> {
+ reg: &'a TaskRegistry,
+ name: &'static str,
+ scope: TaskScope,
+ critical: bool,
+ shard: Option<Rc<IggyShard>>,
+ run: Option<Box<dyn FnOnce(TaskCtx) -> LocalBoxFuture<'static, Result<(),
IggyError>>>>,
+ _p: PhantomData<S>,
+}
+
+impl<'a> ContinuousBuilder<'a, NoTask> {
+ pub fn new(reg: &'a TaskRegistry, name: &'static str) -> Self {
+ Self {
+ reg,
+ name,
+ scope: TaskScope::AllShards,
+ critical: false,
+ shard: None,
+ run: None,
+ _p: PhantomData,
+ }
+ }
+
+ pub fn on_shard(mut self, scope: TaskScope) -> Self {
+ self.scope = scope;
+ self
+ }
+
+ pub fn critical(mut self, c: bool) -> Self {
+ self.critical = c;
+ self
+ }
+
+ pub fn with_shard(mut self, shard: Rc<IggyShard>) -> Self {
+ self.shard = Some(shard);
+ self
+ }
+
+ pub fn run<F, Fut>(self, f: F) -> ContinuousBuilder<'a, HasTask>
+ where
+ F: FnOnce(TaskCtx) -> Fut + 'static,
+ Fut: std::future::Future<Output = Result<(), IggyError>> + 'static,
+ {
+ ContinuousBuilder {
+ reg: self.reg,
+ name: self.name,
+ scope: self.scope,
+ critical: self.critical,
+ shard: self.shard,
+ run: Some(Box::new(move |ctx| Box::pin(f(ctx)))),
+ _p: PhantomData,
+ }
+ }
+}
+
+impl<'a> ContinuousBuilder<'a, HasTask> {
+ pub fn spawn(self) {
+ let shard = self.shard.expect("shard required");
+ if !self.scope.should_run(&shard) {
Review Comment:
Let's try move away from passing the entire shard in places, where we only
need some metadata about it. This way it's easier to disaggregate shards
components, it's metadata and the shard itself into different modules. Create
either a type alias for shard_id, or even a newtype it, so it can be attached
to an `shard` module in the future.
Also one more thought, it looks like the `spawn` function should be part of
some sort of trait.
##########
core/server/src/http/http_server.rs:
##########
@@ -105,7 +111,16 @@ pub async fn start(config: HttpConfig, persister:
Arc<PersisterKind>, shard: Rc<
app = app.layer(middleware::from_fn_with_state(app_state.clone(),
metrics));
}
- start_expired_tokens_cleaner(app_state.clone());
+ {
+ task_registry().spawn_periodic(
+ shard.clone(),
+ Box::new(ClearJwtTokens::new(
+ app_state.clone(),
+ JWT_TOKENS_CLEANER_PERIOD,
+ )),
+ );
+ }
Review Comment:
Why is this inside of an scope ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]