This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch merge_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3c1518c000848a78d33c8d25fc6082cfee031e3f Merge: 30d567f6 88bc91d1 Author: numinex <[email protected]> AuthorDate: Wed Jul 9 16:40:34 2025 +0200 Merge branch 'master' into merge_master .github/changed-files-config.json | 3 +- .github/workflows/ci-check-csharp-sdk.yml | 4 +- .github/workflows/ci-check-go-sdk.yml | 15 + .github/workflows/ci-test-bdd.yml | 2 +- README.md | 18 +- bdd/README.md | 7 + .../ci-check-go-sdk.yml => bdd/csharp/Dockerfile | 34 +- bdd/docker-compose.yml | 10 + bdd/go/tests/tcp_test/client_feature_get_all.go | 26 +- bdd/go/tests/tcp_test/consumers_feature_create.go | 22 +- bdd/go/tests/tcp_test/consumers_feature_delete.go | 18 +- bdd/go/tests/tcp_test/consumers_feature_get_all.go | 12 +- .../tests/tcp_test/consumers_feature_get_by_id.go | 14 +- bdd/go/tests/tcp_test/consumers_feature_join.go | 18 +- bdd/go/tests/tcp_test/consumers_feature_leave.go | 18 +- bdd/go/tests/tcp_test/consumers_steps.go | 60 ++-- bdd/go/tests/tcp_test/messages_feature_send.go | 20 +- bdd/go/tests/tcp_test/messages_steps.go | 20 +- bdd/go/tests/tcp_test/partitions_feature_create.go | 16 +- bdd/go/tests/tcp_test/partitions_feature_delete.go | 16 +- bdd/go/tests/tcp_test/partitions_steps.go | 12 +- bdd/go/tests/tcp_test/pat_feature_create.go | 12 +- bdd/go/tests/tcp_test/pat_feature_delete.go | 12 +- bdd/go/tests/tcp_test/pat_feature_get_all.go | 12 +- bdd/go/tests/tcp_test/pat_steps.go | 22 +- bdd/go/tests/tcp_test/ping_feature.go | 12 +- bdd/go/tests/tcp_test/session_feature_login.go | 32 +- bdd/go/tests/tcp_test/session_feature_logout.go | 12 +- bdd/go/tests/tcp_test/stats_feature.go | 14 +- bdd/go/tests/tcp_test/stream_feature_create.go | 18 +- bdd/go/tests/tcp_test/stream_feature_delete.go | 14 +- bdd/go/tests/tcp_test/stream_feature_get_all.go | 12 +- bdd/go/tests/tcp_test/stream_feature_get_by_id.go | 10 +- bdd/go/tests/tcp_test/stream_feature_update.go | 18 +- bdd/go/tests/tcp_test/stream_steps.go | 48 +-- bdd/go/tests/tcp_test/tcp_suite_test.go | 8 +- bdd/go/tests/tcp_test/test_shared_steps.go | 20 +- bdd/go/tests/tcp_test/topic_feature_create.go | 20 +- bdd/go/tests/tcp_test/topic_feature_delete.go | 16 +- bdd/go/tests/tcp_test/topic_feature_get_all.go | 12 +- bdd/go/tests/tcp_test/topic_feature_get_by_id.go | 12 +- bdd/go/tests/tcp_test/topic_feature_update.go | 20 +- bdd/go/tests/tcp_test/topic_steps.go | 56 +-- bdd/go/tests/tcp_test/users_feature_create.go | 14 +- bdd/go/tests/tcp_test/users_feature_delete.go | 12 +- bdd/go/tests/tcp_test/users_feature_get_all.go | 12 +- bdd/go/tests/tcp_test/users_feature_get_by_id.go | 8 +- bdd/go/tests/tcp_test/users_feature_password.go | 13 +- bdd/go/tests/tcp_test/users_feature_permissions.go | 12 +- bdd/go/tests/tcp_test/users_feature_update.go | 12 +- bdd/go/tests/tcp_test/users_steps.go | 92 +++-- core/certs/iggy.pfx | Bin 2357 -> 2643 bytes core/certs/iggy_cert.pem | 35 +- core/certs/iggy_key.pem | 55 +-- core/server/src/tcp/tcp_tls_listener.rs | 22 +- foreign/csharp/.dockerignore | 400 +++++++++++++++++++++ foreign/csharp/DEPENDENCIES.md | 1 + foreign/csharp/Directory.Packages.props | 43 +-- .../Iggy_SDK.Tests.BDD/Context/TestContext.cs | 32 ++ .../csharp/Iggy_SDK.Tests.BDD/Context/TestHooks.cs | 55 +++ .../Iggy_SDK.Tests.BDD/Iggy_SDK.Tests.BDD.csproj | 44 +++ foreign/csharp/Iggy_SDK.Tests.BDD/README.md | 21 ++ .../BasicMessagingOperationsSteps.cs | 224 ++++++++++++ foreign/csharp/Iggy_SDK.sln | 6 + foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs | 3 +- foreign/csharp/Iggy_SDK/Messages/Message.cs | 13 + .../Iggy_SDK_Tests/MapperTests/BinaryMapper.cs | 1 + foreign/go/README.md | 8 + .../binary_request_serializer.go | 45 ++- .../binary_response_deserializer.go | 153 ++++---- .../binary_serialization/identifier_serializer.go | 5 +- foreign/go/contracts/partitions.go | 8 +- foreign/go/iggycli/client.go | 134 +++---- foreign/go/iggycli/iggy_client.go | 10 +- foreign/go/samples/consumer/consumer.go | 18 +- foreign/go/samples/producer/message_generator.go | 29 +- foreign/go/samples/producer/producer.go | 22 +- foreign/go/tcp/tcp_access_token_managament.go | 16 +- foreign/go/tcp/tcp_clients_managament.go | 10 +- foreign/go/tcp/tcp_consumer_group_managament.go | 28 +- foreign/go/tcp/tcp_core.go | 21 +- foreign/go/tcp/tcp_messaging.go | 24 +- foreign/go/tcp/tcp_offset_managament.go | 14 +- foreign/go/tcp/tcp_partition_managament.go | 14 +- foreign/go/tcp/tcp_session_managament.go | 14 +- foreign/go/tcp/tcp_stream_managament.go | 22 +- foreign/go/tcp/tcp_topic_managament.go | 26 +- foreign/go/tcp/tcp_user_managament.go | 38 +- foreign/go/tcp/tcp_utilities.go | 8 +- scripts/run-bdd-tests.sh | 12 +- 90 files changed, 1701 insertions(+), 885 deletions(-) diff --cc core/server/src/tcp/tcp_tls_listener.rs index 00d7b5c4,5d6c0cdf..804a07da --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@@ -36,93 -33,85 +36,97 @@@ use std::time::Duration use tracing::{error, info}; pub(crate) async fn start( - address: &str, - config: TcpTlsConfig, - socket: TcpSocket, - system: SharedSystem, -) -> SocketAddr { - let address = address.to_string(); - let (tx, rx) = oneshot::channel(); - tokio::spawn(async move { - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + server_name: &'static str, + addr: SocketAddr, + socket: Socket, + shard: Rc<IggyShard>, +) -> Result<(), IggyError> { + /* + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let config = &shard.config.tcp.tls; - let (certs, key) = - if config.self_signed && !std::path::Path::new(&config.cert_file).exists() { - info!("Generating self-signed certificate for TCP TLS server"); - generate_self_signed_cert() - .unwrap_or_else(|e| panic!("Failed to generate self-signed certificate: {e}")) - } else { - load_certificates(&config.cert_file, &config.key_file) - .unwrap_or_else(|e| panic!("Failed to load certificates: {e}")) - }; + let (certs, key) = + if config.self_signed && !std::path::Path::new(&config.cert_file).exists() { + info!("Generating self-signed certificate for TCP TLS server"); + generate_self_signed_cert() + .unwrap_or_else(|e| panic!("Failed to generate self-signed certificate: {e}")) + } else { + info!( + "Loading certificates from cert_file: {}, key_file: {}", + config.cert_file, config.key_file + ); + load_certificates(&config.cert_file, &config.key_file) + .unwrap_or_else(|e| panic!("Failed to load certificates: {e}")) + }; - let server_config = ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(certs, key) - .unwrap_or_else(|e| panic!("Unable to create TLS server config: {e}")); + let server_config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key) + .unwrap_or_else(|e| panic!("Unable to create TLS server config: {e}")); - let acceptor = TlsAcceptor::from(Arc::new(server_config)); + let acceptor = TlsAcceptor::from(Arc::new(server_config)); - let addr = address.parse(); - if addr.is_err() { - panic!("Unable to parse address {address:?}"); - } + socket + .bind(&addr.into()) + .unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr}': {e}",)); + + let listener: std::net::TcpListener = socket.into(); + let listener = monoio::net::TcpListener::from_std(listener).unwrap(); + info!("{server_name} server has started on: {:?}", addr); + + loop { + let shutdown_check = async { + loop { + if shard.is_shutting_down() { + return; + } + monoio::time::sleep(Duration::from_millis(100)).await; + } + }; + + let accept_future = listener.accept(); + futures::select! { + _ = shutdown_check.fuse() => { + info!("TCP TLS server detected shutdown flag, no longer accepting connections"); + break; + } + result = accept_future.fuse() => { + match result { + Ok((stream, address)) => { + if shard.is_shutting_down() { + info!("Rejecting new TLS connection from {} during shutdown", address); + continue; + } + let shard_clone = shard.clone(); + info!("Accepted new TCP TLS connection: {}", address); + let transport = Transport::Tcp; + let session = shard_clone.add_client(&address, transport); + //TODO: Those can be shared with other shards. + shard_clone.add_active_session(session.clone()); + // Broadcast session to all shards. + let event = ShardEvent::NewSession { address, transport }; + // TODO: Fixme look inside of broadcast_event_to_all_shards method. + let _responses = shard_clone.broadcast_event_to_all_shards(event.into()); + + let client_id = session.client_id; + info!("Created new session: {session}"); + let acceptor = acceptor.clone(); + + let conn_stop_receiver = shard_clone.task_registry.add_connection(client_id); + + let shard_for_conn = shard_clone.clone(); + shard_clone.task_registry.spawn_tracked(async move { + match acceptor.accept(stream).await { + Ok(tls_stream) => { + let mut sender = SenderKind::get_tcp_tls_sender(tls_stream.into()); + if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { + handle_error(error); + } + shard_for_conn.task_registry.remove_connection(&client_id); - let addr = addr.unwrap(); - socket - .bind(addr) - .unwrap_or_else(|e| panic!("Unable to bind socket to address '{addr}': {e}",)); - - let listener = socket - .listen(1024) - .unwrap_or_else(|e| panic!("Unable to start TCP TLS server on '{address}': {e}",)); - - let local_addr = listener - .local_addr() - .unwrap_or_else(|e| panic!("Failed to get local address for TCP TLS listener: {e}",)); - - tx.send(local_addr).unwrap_or_else(|_| { - panic!("Failed to send the local address '{local_addr}' for TCP TLS listener") - }); - - loop { - match listener.accept().await { - Ok((stream, address)) => { - info!("Accepted new TCP TLS connection: {}", address); - let session = system - .read() - .await - .add_client(&address, Transport::Tcp) - .await; - - let client_id = session.client_id; - let acceptor = acceptor.clone(); - let system_clone = system.clone(); - match acceptor.accept(stream).await { - Ok(stream) => { - let mut sender = SenderKind::get_tcp_tls_sender(stream); - tokio::spawn(async move { - if let Err(error) = - handle_connection(session, &mut sender, system_clone.clone()) - .await - { - handle_error(error); - system_clone.read().await.delete_client(client_id).await; if let Err(error) = sender.shutdown().await { error!( - "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" + "Failed to shutdown TCP TLS stream for client: {client_id}, address: {address}. {error}" ); } else { info!(
