This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch merge_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 3c1518c000848a78d33c8d25fc6082cfee031e3f
Merge: 30d567f6 88bc91d1
Author: numinex <[email protected]>
AuthorDate: Wed Jul 9 16:40:34 2025 +0200

    Merge branch 'master' into merge_master

 .github/changed-files-config.json                  |   3 +-
 .github/workflows/ci-check-csharp-sdk.yml          |   4 +-
 .github/workflows/ci-check-go-sdk.yml              |  15 +
 .github/workflows/ci-test-bdd.yml                  |   2 +-
 README.md                                          |  18 +-
 bdd/README.md                                      |   7 +
 .../ci-check-go-sdk.yml => bdd/csharp/Dockerfile   |  34 +-
 bdd/docker-compose.yml                             |  10 +
 bdd/go/tests/tcp_test/client_feature_get_all.go    |  26 +-
 bdd/go/tests/tcp_test/consumers_feature_create.go  |  22 +-
 bdd/go/tests/tcp_test/consumers_feature_delete.go  |  18 +-
 bdd/go/tests/tcp_test/consumers_feature_get_all.go |  12 +-
 .../tests/tcp_test/consumers_feature_get_by_id.go  |  14 +-
 bdd/go/tests/tcp_test/consumers_feature_join.go    |  18 +-
 bdd/go/tests/tcp_test/consumers_feature_leave.go   |  18 +-
 bdd/go/tests/tcp_test/consumers_steps.go           |  60 ++--
 bdd/go/tests/tcp_test/messages_feature_send.go     |  20 +-
 bdd/go/tests/tcp_test/messages_steps.go            |  20 +-
 bdd/go/tests/tcp_test/partitions_feature_create.go |  16 +-
 bdd/go/tests/tcp_test/partitions_feature_delete.go |  16 +-
 bdd/go/tests/tcp_test/partitions_steps.go          |  12 +-
 bdd/go/tests/tcp_test/pat_feature_create.go        |  12 +-
 bdd/go/tests/tcp_test/pat_feature_delete.go        |  12 +-
 bdd/go/tests/tcp_test/pat_feature_get_all.go       |  12 +-
 bdd/go/tests/tcp_test/pat_steps.go                 |  22 +-
 bdd/go/tests/tcp_test/ping_feature.go              |  12 +-
 bdd/go/tests/tcp_test/session_feature_login.go     |  32 +-
 bdd/go/tests/tcp_test/session_feature_logout.go    |  12 +-
 bdd/go/tests/tcp_test/stats_feature.go             |  14 +-
 bdd/go/tests/tcp_test/stream_feature_create.go     |  18 +-
 bdd/go/tests/tcp_test/stream_feature_delete.go     |  14 +-
 bdd/go/tests/tcp_test/stream_feature_get_all.go    |  12 +-
 bdd/go/tests/tcp_test/stream_feature_get_by_id.go  |  10 +-
 bdd/go/tests/tcp_test/stream_feature_update.go     |  18 +-
 bdd/go/tests/tcp_test/stream_steps.go              |  48 +--
 bdd/go/tests/tcp_test/tcp_suite_test.go            |   8 +-
 bdd/go/tests/tcp_test/test_shared_steps.go         |  20 +-
 bdd/go/tests/tcp_test/topic_feature_create.go      |  20 +-
 bdd/go/tests/tcp_test/topic_feature_delete.go      |  16 +-
 bdd/go/tests/tcp_test/topic_feature_get_all.go     |  12 +-
 bdd/go/tests/tcp_test/topic_feature_get_by_id.go   |  12 +-
 bdd/go/tests/tcp_test/topic_feature_update.go      |  20 +-
 bdd/go/tests/tcp_test/topic_steps.go               |  56 +--
 bdd/go/tests/tcp_test/users_feature_create.go      |  14 +-
 bdd/go/tests/tcp_test/users_feature_delete.go      |  12 +-
 bdd/go/tests/tcp_test/users_feature_get_all.go     |  12 +-
 bdd/go/tests/tcp_test/users_feature_get_by_id.go   |   8 +-
 bdd/go/tests/tcp_test/users_feature_password.go    |  13 +-
 bdd/go/tests/tcp_test/users_feature_permissions.go |  12 +-
 bdd/go/tests/tcp_test/users_feature_update.go      |  12 +-
 bdd/go/tests/tcp_test/users_steps.go               |  92 +++--
 core/certs/iggy.pfx                                | Bin 2357 -> 2643 bytes
 core/certs/iggy_cert.pem                           |  35 +-
 core/certs/iggy_key.pem                            |  55 +--
 core/server/src/tcp/tcp_tls_listener.rs            |  22 +-
 foreign/csharp/.dockerignore                       | 400 +++++++++++++++++++++
 foreign/csharp/DEPENDENCIES.md                     |   1 +
 foreign/csharp/Directory.Packages.props            |  43 +--
 .../Iggy_SDK.Tests.BDD/Context/TestContext.cs      |  32 ++
 .../csharp/Iggy_SDK.Tests.BDD/Context/TestHooks.cs |  55 +++
 .../Iggy_SDK.Tests.BDD/Iggy_SDK.Tests.BDD.csproj   |  44 +++
 foreign/csharp/Iggy_SDK.Tests.BDD/README.md        |  21 ++
 .../BasicMessagingOperationsSteps.cs               | 224 ++++++++++++
 foreign/csharp/Iggy_SDK.sln                        |   6 +
 foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs    |   3 +-
 foreign/csharp/Iggy_SDK/Messages/Message.cs        |  13 +
 .../Iggy_SDK_Tests/MapperTests/BinaryMapper.cs     |   1 +
 foreign/go/README.md                               |   8 +
 .../binary_request_serializer.go                   |  45 ++-
 .../binary_response_deserializer.go                | 153 ++++----
 .../binary_serialization/identifier_serializer.go  |   5 +-
 foreign/go/contracts/partitions.go                 |   8 +-
 foreign/go/iggycli/client.go                       | 134 +++----
 foreign/go/iggycli/iggy_client.go                  |  10 +-
 foreign/go/samples/consumer/consumer.go            |  18 +-
 foreign/go/samples/producer/message_generator.go   |  29 +-
 foreign/go/samples/producer/producer.go            |  22 +-
 foreign/go/tcp/tcp_access_token_managament.go      |  16 +-
 foreign/go/tcp/tcp_clients_managament.go           |  10 +-
 foreign/go/tcp/tcp_consumer_group_managament.go    |  28 +-
 foreign/go/tcp/tcp_core.go                         |  21 +-
 foreign/go/tcp/tcp_messaging.go                    |  24 +-
 foreign/go/tcp/tcp_offset_managament.go            |  14 +-
 foreign/go/tcp/tcp_partition_managament.go         |  14 +-
 foreign/go/tcp/tcp_session_managament.go           |  14 +-
 foreign/go/tcp/tcp_stream_managament.go            |  22 +-
 foreign/go/tcp/tcp_topic_managament.go             |  26 +-
 foreign/go/tcp/tcp_user_managament.go              |  38 +-
 foreign/go/tcp/tcp_utilities.go                    |   8 +-
 scripts/run-bdd-tests.sh                           |  12 +-
 90 files changed, 1701 insertions(+), 885 deletions(-)

diff --cc core/server/src/tcp/tcp_tls_listener.rs
index 00d7b5c4,5d6c0cdf..804a07da
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@@ -36,93 -33,85 +36,97 @@@ use std::time::Duration
  use tracing::{error, info};
  
  pub(crate) async fn start(
 -    address: &str,
 -    config: TcpTlsConfig,
 -    socket: TcpSocket,
 -    system: SharedSystem,
 -) -> SocketAddr {
 -    let address = address.to_string();
 -    let (tx, rx) = oneshot::channel();
 -    tokio::spawn(async move {
 -        let _ = 
rustls::crypto::aws_lc_rs::default_provider().install_default();
 +    server_name: &'static str,
 +    addr: SocketAddr,
 +    socket: Socket,
 +    shard: Rc<IggyShard>,
 +) -> Result<(), IggyError> {
 +    /*
 +    let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
 +    let config = &shard.config.tcp.tls;
  
-     let (certs, key) =
-         if config.self_signed && 
!std::path::Path::new(&config.cert_file).exists() {
-             info!("Generating self-signed certificate for TCP TLS server");
-             generate_self_signed_cert()
-                 .unwrap_or_else(|e| panic!("Failed to generate self-signed 
certificate: {e}"))
-         } else {
-             load_certificates(&config.cert_file, &config.key_file)
-                 .unwrap_or_else(|e| panic!("Failed to load certificates: 
{e}"))
-         };
+         let (certs, key) =
+             if config.self_signed && 
!std::path::Path::new(&config.cert_file).exists() {
+                 info!("Generating self-signed certificate for TCP TLS 
server");
+                 generate_self_signed_cert()
+                     .unwrap_or_else(|e| panic!("Failed to generate 
self-signed certificate: {e}"))
+             } else {
+                 info!(
+                     "Loading certificates from cert_file: {}, key_file: {}",
+                     config.cert_file, config.key_file
+                 );
+                 load_certificates(&config.cert_file, &config.key_file)
+                     .unwrap_or_else(|e| panic!("Failed to load certificates: 
{e}"))
+             };
  
 -        let server_config = ServerConfig::builder()
 -            .with_no_client_auth()
 -            .with_single_cert(certs, key)
 -            .unwrap_or_else(|e| panic!("Unable to create TLS server config: 
{e}"));
 +    let server_config = ServerConfig::builder()
 +        .with_no_client_auth()
 +        .with_single_cert(certs, key)
 +        .unwrap_or_else(|e| panic!("Unable to create TLS server config: 
{e}"));
  
 -        let acceptor = TlsAcceptor::from(Arc::new(server_config));
 +    let acceptor = TlsAcceptor::from(Arc::new(server_config));
  
 -        let addr = address.parse();
 -        if addr.is_err() {
 -            panic!("Unable to parse address {address:?}");
 -        }
 +    socket
 +        .bind(&addr.into())
 +        .unwrap_or_else(|e| panic!("Unable to bind socket to address 
'{addr}': {e}",));
 +
 +    let listener: std::net::TcpListener = socket.into();
 +    let listener = monoio::net::TcpListener::from_std(listener).unwrap();
 +    info!("{server_name} server has started on: {:?}", addr);
 +
 +    loop {
 +        let shutdown_check = async {
 +            loop {
 +                if shard.is_shutting_down() {
 +                    return;
 +                }
 +                monoio::time::sleep(Duration::from_millis(100)).await;
 +            }
 +        };
 +
 +        let accept_future = listener.accept();
 +        futures::select! {
 +            _ = shutdown_check.fuse() => {
 +                info!("TCP TLS server detected shutdown flag, no longer 
accepting connections");
 +                break;
 +            }
 +            result = accept_future.fuse() => {
 +                match result {
 +                    Ok((stream, address)) => {
 +                        if shard.is_shutting_down() {
 +                            info!("Rejecting new TLS connection from {} 
during shutdown", address);
 +                            continue;
 +                        }
 +                        let shard_clone = shard.clone();
 +                        info!("Accepted new TCP TLS connection: {}", address);
 +                        let transport = Transport::Tcp;
 +                        let session = shard_clone.add_client(&address, 
transport);
 +                        //TODO: Those can be shared with other shards.
 +                        shard_clone.add_active_session(session.clone());
 +                        // Broadcast session to all shards.
 +                        let event = ShardEvent::NewSession { address, 
transport };
 +                        // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
 +                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into());
 +
 +                        let client_id = session.client_id;
 +                        info!("Created new session: {session}");
 +                        let acceptor = acceptor.clone();
 +
 +                        let conn_stop_receiver = 
shard_clone.task_registry.add_connection(client_id);
 +
 +                        let shard_for_conn = shard_clone.clone();
 +                        shard_clone.task_registry.spawn_tracked(async move {
 +                            match acceptor.accept(stream).await {
 +                                Ok(tls_stream) => {
 +                                    let mut sender = 
SenderKind::get_tcp_tls_sender(tls_stream.into());
 +                                    if let Err(error) = 
handle_connection(&session, &mut sender, &shard_for_conn, 
conn_stop_receiver).await {
 +                                        handle_error(error);
 +                                    }
 +                                    
shard_for_conn.task_registry.remove_connection(&client_id);
  
 -        let addr = addr.unwrap();
 -        socket
 -            .bind(addr)
 -            .unwrap_or_else(|e| panic!("Unable to bind socket to address 
'{addr}': {e}",));
 -
 -        let listener = socket
 -            .listen(1024)
 -            .unwrap_or_else(|e| panic!("Unable to start TCP TLS server on 
'{address}': {e}",));
 -
 -        let local_addr = listener
 -            .local_addr()
 -            .unwrap_or_else(|e| panic!("Failed to get local address for TCP 
TLS listener: {e}",));
 -
 -        tx.send(local_addr).unwrap_or_else(|_| {
 -            panic!("Failed to send the local address '{local_addr}' for TCP 
TLS listener")
 -        });
 -
 -        loop {
 -            match listener.accept().await {
 -                Ok((stream, address)) => {
 -                    info!("Accepted new TCP TLS connection: {}", address);
 -                    let session = system
 -                        .read()
 -                        .await
 -                        .add_client(&address, Transport::Tcp)
 -                        .await;
 -
 -                    let client_id = session.client_id;
 -                    let acceptor = acceptor.clone();
 -                    let system_clone = system.clone();
 -                    match acceptor.accept(stream).await {
 -                        Ok(stream) => {
 -                            let mut sender = 
SenderKind::get_tcp_tls_sender(stream);
 -                            tokio::spawn(async move {
 -                                if let Err(error) =
 -                                    handle_connection(session, &mut sender, 
system_clone.clone())
 -                                        .await
 -                                {
 -                                    handle_error(error);
 -                                    
system_clone.read().await.delete_client(client_id).await;
                                      if let Err(error) = 
sender.shutdown().await {
                                          error!(
 -                                            "Failed to shutdown TCP stream 
for client: {client_id}, address: {address}. {error}"
 +                                            "Failed to shutdown TCP TLS 
stream for client: {client_id}, address: {address}. {error}"
                                          );
                                      } else {
                                          info!(

Reply via email to