This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 76ee665c6578d7ca5b4564ba44c812dbe17bdd5d Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Jun 26 14:47:20 2023 +0200 MINIFICPP-2133 Add TLS 1.3 support Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1600 --- PROCESSORS.md | 6 +-- cmake/Asio.cmake | 4 +- extensions/http-curl/client/HTTPClient.cpp | 14 ++++++ .../standard-processors/processors/PutTCP.cpp | 25 +++++++++- extensions/standard-processors/processors/PutTCP.h | 5 +- extensions/standard-processors/processors/PutUDP.h | 2 +- .../standard-processors/tests/unit/GetTCPTests.cpp | 5 +- .../tests/unit/ListenTcpTests.cpp | 7 ++- .../standard-processors/tests/unit/PutTCPTests.cpp | 11 +++-- libminifi/include/utils/BaseHTTPClient.h | 1 + libminifi/include/utils/net/AsioSocketUtils.h | 5 +- libminifi/include/utils/net/TcpServer.h | 6 +-- libminifi/src/c2/ControllerSocketProtocol.cpp | 2 +- libminifi/src/controllers/SSLContextService.cpp | 6 +-- libminifi/src/utils/net/TcpServer.cpp | 53 +++++++++++++++------- libminifi/test/Utils.h | 7 ++- 16 files changed, 116 insertions(+), 43 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 286eab523..a865342dd 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -2433,7 +2433,7 @@ In the list below, the names of required properties appear in bold. Any other pr ### Description -The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specif [...] +The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specif [...] ### Properties @@ -2455,7 +2455,7 @@ In the list below, the names of required properties appear in bold. Any other pr | Name | Description | |---------|----------------------------------------------------------------------------| | success | FlowFiles that are sent to the destination are sent out this relationship. | -| failure | FlowFiles that encountered IO errors are send out this relationship. | +| failure | FlowFiles that encountered IO errors are sent out this relationship. | ## PutUDP @@ -2478,7 +2478,7 @@ In the list below, the names of required properties appear in bold. Any other pr | Name | Description | |---------|----------------------------------------------------------------------------| | success | FlowFiles that are sent to the destination are sent out this relationship. | -| failure | FlowFiles that encountered IO errors are send out this relationship. | +| failure | FlowFiles that encountered IO errors are sent out this relationship. | ## QueryDatabaseTable diff --git a/cmake/Asio.cmake b/cmake/Asio.cmake index 14c8f9592..28f4bb928 100644 --- a/cmake/Asio.cmake +++ b/cmake/Asio.cmake @@ -18,8 +18,8 @@ include(FetchContent) FetchContent_Declare(asio - URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-24-0.tar.gz - URL_HASH SHA256=cbcaaba0f66722787b1a7c33afe1befb3a012b5af3ad7da7ff0f6b8c9b7a8a5b) + URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-28-1.tar.gz + URL_HASH SHA256=5ff6111ec8cbe73a168d997c547f562713aa7bd004c5c02326f0e9d579a5f2ce) FetchContent_GetProperties(asio) if(NOT asio_POPULATED) diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp index af131f3d6..6e8d41100 100644 --- a/extensions/http-curl/client/HTTPClient.cpp +++ b/extensions/http-curl/client/HTTPClient.cpp @@ -167,6 +167,11 @@ bool HTTPClient::setSpecificSSLVersion(utils::SSLVersion specific_version) { ssl_context_service_->setMaxTlsVersion(TLS1_2_VERSION); break; } + case utils::SSLVersion::TLSv1_3: { + ssl_context_service_->setMinTlsVersion(TLS1_3_VERSION); + ssl_context_service_->setMaxTlsVersion(TLS1_3_VERSION); + break; + } default: break; } } @@ -181,6 +186,8 @@ bool HTTPClient::setSpecificSSLVersion(utils::SSLVersion specific_version) { return CURLE_OK == curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, static_cast<int>(CURL_SSLVERSION_TLSv1_1) | static_cast<int>(CURL_SSLVERSION_MAX_TLSv1_1)); case utils::SSLVersion::TLSv1_2: return CURLE_OK == curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, static_cast<int>(CURL_SSLVERSION_TLSv1_2) | static_cast<int>(CURL_SSLVERSION_MAX_TLSv1_2)); + case utils::SSLVersion::TLSv1_3: + return CURLE_OK == curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, static_cast<int>(CURL_SSLVERSION_TLSv1_3) | static_cast<int>(CURL_SSLVERSION_MAX_TLSv1_3)); default: return false; } #else @@ -205,6 +212,10 @@ bool HTTPClient::setMinimumSSLVersion(utils::SSLVersion minimum_version) { ssl_context_service_->setMinTlsVersion(TLS1_2_VERSION); break; } + case utils::SSLVersion::TLSv1_3: { + ssl_context_service_->setMinTlsVersion(TLS1_3_VERSION); + break; + } default: break; } } @@ -221,6 +232,9 @@ bool HTTPClient::setMinimumSSLVersion(utils::SSLVersion minimum_version) { case utils::SSLVersion::TLSv1_2: ret = curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); break; + case utils::SSLVersion::TLSv1_3: + ret = curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_3); + break; } return ret == CURLE_OK; diff --git a/extensions/standard-processors/processors/PutTCP.cpp b/extensions/standard-processors/processors/PutTCP.cpp index ece033a52..389f4d0ef 100644 --- a/extensions/standard-processors/processors/PutTCP.cpp +++ b/extensions/standard-processors/processors/PutTCP.cpp @@ -120,7 +120,9 @@ class ConnectionHandler : public ConnectionHandlerBase { ssl_context_(ssl_context) { } - ~ConnectionHandler() override = default; + ~ConnectionHandler() override { + shutdownSocket(); + } asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter, @@ -144,6 +146,7 @@ class ConnectionHandler : public ConnectionHandlerBase { asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter); SocketType createNewSocket(asio::io_context& io_context_); + void shutdownSocket(); utils::net::ConnectionId connection_id_; std::optional<SocketType> socket_; @@ -169,6 +172,26 @@ SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_con return {io_context_, *ssl_context_}; } +template<> +void ConnectionHandler<TcpSocket>::shutdownSocket() { +} + +template<> +void ConnectionHandler<SslSocket>::shutdownSocket() { + gsl_Expects(ssl_context_); + if (socket_) { + asio::error_code ec; + socket_->lowest_layer().cancel(ec); + if (ec) { + logger_->log_error("Cancelling asynchronous operations of SSL socket failed with: %s", ec.message()); + } + socket_->shutdown(ec); + if (ec) { + logger_->log_error("Shutdown of SSL socket failed with: %s", ec.message()); + } + } +} + template<class SocketType> asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context) { auto socket = createNewSocket(io_context); diff --git a/extensions/standard-processors/processors/PutTCP.h b/extensions/standard-processors/processors/PutTCP.h index 5063bae67..b45e2c624 100644 --- a/extensions/standard-processors/processors/PutTCP.h +++ b/extensions/standard-processors/processors/PutTCP.h @@ -61,7 +61,8 @@ class PutTCP final : public core::Processor { "By default, the FlowFiles are transmitted over the same TCP connection. To assist the TCP server with determining message boundaries, " "an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. " "An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection " - "which is closed after the FlowFile has been sent."; + "which is closed after the FlowFile has been sent. Note: When using TLS 1.3 the processor can still route the flow file to success if the TLS handshake fails. This is due to TLS 1.3's " + "faster handshake process which allows the message to be sent before we know the result of the TLS handshake."; EXTENSIONAPI static constexpr auto Hostname = core::PropertyDefinitionBuilder<>::createProperty("Hostname") .withDescription("The ip address or hostname of the destination.") @@ -125,7 +126,7 @@ class PutTCP final : public core::Processor { EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; - EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles that encountered IO errors are sent out this relationship."}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; diff --git a/extensions/standard-processors/processors/PutUDP.h b/extensions/standard-processors/processors/PutUDP.h index ec0ad92ce..b43722153 100644 --- a/extensions/standard-processors/processors/PutUDP.h +++ b/extensions/standard-processors/processors/PutUDP.h @@ -50,7 +50,7 @@ class PutUDP final : public core::Processor { EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 2>{Hostname, Port}; EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are sent to the destination are sent out this relationship."}; - EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles that encountered IO errors are send out this relationship."}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles that encountered IO errors are sent out this relationship."}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp b/extensions/standard-processors/tests/unit/GetTCPTests.cpp index e60d02dbe..c27dd5d4e 100644 --- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp +++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp @@ -72,7 +72,7 @@ class TcpTestServer { void enableSSL() { const std::filesystem::path executable_dir = minifi::utils::file::FileUtils::get_executable_dir(); - asio::ssl::context ssl_context(asio::ssl::context::tlsv12_server); + asio::ssl::context ssl_context(asio::ssl::context::tls_server); ssl_context.set_options(minifi::utils::net::MINIFI_SSL_OPTIONS); ssl_context.set_password_callback([key_pw = "Password12"](std::size_t&, asio::ssl::context_base::password_purpose&) { return key_pw; }); ssl_context.use_certificate_file((executable_dir / "resources" / "localhost_by_A.pem").string(), asio::ssl::context::pem); @@ -113,6 +113,9 @@ class TcpTestServer { co_return; } co_await sendMessages(ssl_socket); + asio::error_code ec; + ssl_socket.lowest_layer().cancel(ec); + co_await ssl_socket.async_shutdown(minifi::utils::net::use_nothrow_awaitable); } asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket) { diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp index 4e0cf1b2f..b1afd5f13 100644 --- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp +++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp @@ -22,10 +22,12 @@ #include "Utils.h" #include "controllers/SSLContextService.h" #include "range/v3/algorithm/contains.hpp" +#include "utils/IntegrationTestUtils.h" using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP; using namespace std::literals::chrono_literals; +using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; namespace org::apache::nifi::minifi::test { @@ -211,7 +213,8 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP][NetworkListenerProc endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port); } - CHECK_THAT(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"), MatchesError()); + utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / "resources" / "ca_A.crt"); + CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "peer did not return a certificate (SSL routines)")); } ProcessorTriggerResult result; @@ -287,7 +290,7 @@ TEST_CASE("Test ListenTCP SSL/TLS compatibility", "[ListenTCP][NetworkListenerPr SECTION("tlsv13 should be enabled") { client_method = asio::ssl::context::method::tlsv13_client; - expected_to_work = false; + expected_to_work = true; } if (!isSslMethodAvailable(client_method)) diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp index c367d3ae7..be38776ef 100644 --- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp +++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp @@ -32,6 +32,7 @@ #include "IntegrationTestUtils.h" using namespace std::literals::chrono_literals; +using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; namespace org::apache::nifi::minifi::processors { @@ -64,12 +65,14 @@ class CancellableTcpServer : public utils::net::TcpServer { logger_->log_error("Error during accepting new connection: %s", accept_error.message()); break; } + std::error_code error; + auto remote_address = socket.lowest_layer().remote_endpoint(error).address(); auto cancellable_timer = std::make_shared<asio::steady_timer>(io_context_); cancellable_timers_.push_back(cancellable_timer); if (ssl_data_) - co_spawn(io_context_, secureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached); + co_spawn(io_context_, secureSession(std::move(socket), std::move(remote_address), port_) || wait_until_cancelled(cancellable_timer), asio::detached); else - co_spawn(io_context_, insecureSession(std::move(socket)) || wait_until_cancelled(cancellable_timer), asio::detached); + co_spawn(io_context_, insecureSession(std::move(socket), std::move(remote_address), port_) || wait_until_cancelled(cancellable_timer), asio::detached); } } @@ -400,9 +403,9 @@ TEST_CASE("PutTCP test missing client cert", "[PutTCP]") { auto port = test_fixture.addSSLServer(); test_fixture.setPutTCPPort(port); - trigger_expect_failure(test_fixture, "message for invalid-cert server"); + test_fixture.trigger("message for invalid-cert server"); - CHECK(LogTestController::getInstance().matchesRegex("Handshake with .* failed", 0ms)); + CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "peer did not return a certificate (SSL routines)")); } TEST_CASE("PutTCP test idle connection expiration", "[PutTCP]") { diff --git a/libminifi/include/utils/BaseHTTPClient.h b/libminifi/include/utils/BaseHTTPClient.h index 5ffd79a69..d145a7c59 100644 --- a/libminifi/include/utils/BaseHTTPClient.h +++ b/libminifi/include/utils/BaseHTTPClient.h @@ -94,6 +94,7 @@ enum class SSLVersion : uint8_t { TLSv1_0, TLSv1_1, TLSv1_2, + TLSv1_3 }; struct HTTPHeaderResponse { diff --git a/libminifi/include/utils/net/AsioSocketUtils.h b/libminifi/include/utils/net/AsioSocketUtils.h index 8be24ba9e..f7f7cbeaf 100644 --- a/libminifi/include/utils/net/AsioSocketUtils.h +++ b/libminifi/include/utils/net/AsioSocketUtils.h @@ -43,7 +43,8 @@ using HandshakeType = asio::ssl::stream_base::handshake_type; using TcpSocket = asio::ip::tcp::socket; using SslSocket = asio::ssl::stream<asio::ip::tcp::socket>; -constexpr auto MINIFI_SSL_OPTIONS = asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use; +constexpr auto MINIFI_SSL_OPTIONS = asio::ssl::context::default_workarounds | asio::ssl::context::single_dh_use + | asio::ssl::context::no_sslv2 | asio::ssl::context::no_sslv3 | asio::ssl::context::no_tlsv1 | asio::ssl::context::no_tlsv1_1; class ConnectionId { public: @@ -69,7 +70,7 @@ template<> asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket, asio::steady_timer::duration); -asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service, asio::ssl::context::method ssl_context_method = asio::ssl::context::tlsv12_client); +asio::ssl::context getSslContext(const controllers::SSLContextService& ssl_context_service, asio::ssl::context::method ssl_context_method = asio::ssl::context::tls_client); struct SocketData { std::string host = "localhost"; diff --git a/libminifi/include/utils/net/TcpServer.h b/libminifi/include/utils/net/TcpServer.h index 717d674ed..5a2532c76 100644 --- a/libminifi/include/utils/net/TcpServer.h +++ b/libminifi/include/utils/net/TcpServer.h @@ -34,10 +34,10 @@ class TcpServer : public Server { protected: asio::awaitable<void> doReceive() override; - asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket); - asio::awaitable<void> secureSession(asio::ip::tcp::socket socket); + asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port); + asio::awaitable<void> secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port); - asio::awaitable<void> readLoop(auto& socket); + asio::awaitable<void> readLoop(auto& socket, const auto& remote_address, const auto& local_port); std::optional<SslServerOptions> ssl_data_; }; diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index 94ee18acf..44b61f252 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -100,7 +100,7 @@ asio::awaitable<void> ControllerSocketProtocol::startAccept() { } asio::awaitable<void> ControllerSocketProtocol::handshakeAndHandleCommand(asio::ip::tcp::socket&& socket, std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) { - asio::ssl::context ssl_context = utils::net::getSslContext(*ssl_context_service, asio::ssl::context::tlsv12_server); + asio::ssl::context ssl_context = utils::net::getSslContext(*ssl_context_service, asio::ssl::context::tls_server); ssl_context.set_options(utils::net::MINIFI_SSL_OPTIONS); asio::ssl::stream<asio::ip::tcp::socket> ssl_socket(std::move(socket), ssl_context); diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp index 1631fcaed..f82da7242 100644 --- a/libminifi/src/controllers/SSLContextService.cpp +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -128,7 +128,9 @@ bool SSLContextService::configure_ssl_context(SSL_CTX *ctx) { } // Security level set to 0 for backwards compatibility to support TLS versions below v1.2 - SSL_CTX_set_security_level(ctx, 0); + if ((minimum_tls_version_ != -1 && minimum_tls_version_ < TLS1_2_VERSION) || (maximum_tls_version_ != -1 && maximum_tls_version_ < TLS1_2_VERSION)) { + SSL_CTX_set_security_level(ctx, 0); + } if (minimum_tls_version_ != -1) { SSL_CTX_set_min_proto_version(ctx, minimum_tls_version_); @@ -136,8 +138,6 @@ bool SSLContextService::configure_ssl_context(SSL_CTX *ctx) { if (maximum_tls_version_ != -1) { SSL_CTX_set_max_proto_version(ctx, maximum_tls_version_); - } else { - SSL_CTX_set_max_proto_version(ctx, TLS1_2_VERSION); } return true; diff --git a/libminifi/src/utils/net/TcpServer.cpp b/libminifi/src/utils/net/TcpServer.cpp index cf2fe24e2..510318443 100644 --- a/libminifi/src/utils/net/TcpServer.cpp +++ b/libminifi/src/utils/net/TcpServer.cpp @@ -33,28 +33,37 @@ asio::awaitable<void> TcpServer::doReceive() { co_await utils::net::async_wait(1s); continue; } + std::error_code error; + auto remote_address = socket.lowest_layer().remote_endpoint(error).address(); + if (error) + logger_->log_warn("Error during fetching remote endpoint: %s", error.message()); + auto local_port = socket.lowest_layer().local_endpoint(error).port(); + if (error) + logger_->log_warn("Error during fetching local endpoint: %s", error.message()); if (ssl_data_) - co_spawn(io_context_, secureSession(std::move(socket)), asio::detached); + co_spawn(io_context_, secureSession(std::move(socket), std::move(remote_address), local_port), asio::detached); else - co_spawn(io_context_, insecureSession(std::move(socket)), asio::detached); + co_spawn(io_context_, insecureSession(std::move(socket), std::move(remote_address), local_port), asio::detached); } } -asio::awaitable<void> TcpServer::readLoop(auto& socket) { +asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& remote_address, const auto& local_port) { std::string read_message; while (true) { auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable); // NOLINT - if (read_error || bytes_read == 0) + if (read_error) { + if (read_error != asio::error::eof) { + logger_->log_error("Error during reading from socket: %s", read_error.message()); + } co_return; + } + + if (bytes_read == 0) { + logger_->log_debug("No more bytes were read from socket"); + co_return; + } if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) { - std::error_code error; - auto remote_address = socket.lowest_layer().remote_endpoint(error).address(); - if (error) - logger_->log_debug("Error during fetching remote endpoint: %s", error.message()); - auto local_port = socket.lowest_layer().local_endpoint(error).port(); - if (error) - logger_->log_debug("Error during fetching local endpoint: %s", error.message()); concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 1), IpProtocol::TCP, remote_address, local_port)); } else { logger_->log_warn("Queue is full. TCP message ignored."); @@ -63,13 +72,13 @@ asio::awaitable<void> TcpServer::readLoop(auto& socket) { } } -asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket) { - co_return co_await readLoop(socket); // NOLINT +asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port) { + co_return co_await readLoop(socket, remote_address, local_port); // NOLINT } namespace { asio::ssl::context setupSslContext(SslServerOptions& ssl_data) { - asio::ssl::context ssl_context(asio::ssl::context::tlsv12_server); + asio::ssl::context ssl_context(asio::ssl::context::tls_server); ssl_context.set_options(minifi::utils::net::MINIFI_SSL_OPTIONS); ssl_context.set_password_callback([key_pw = ssl_data.cert_data.key_pw](std::size_t&, asio::ssl::context_base::password_purpose&) { return key_pw; }); ssl_context.use_certificate_file(ssl_data.cert_data.cert_loc.string(), asio::ssl::context::pem); @@ -85,16 +94,26 @@ asio::ssl::context setupSslContext(SslServerOptions& ssl_data) { } } // namespace -asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket) { +asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port) { gsl_Expects(ssl_data_); auto ssl_context = setupSslContext(*ssl_data_); SslSocket ssl_socket(std::move(socket), ssl_context); auto [handshake_error] = co_await ssl_socket.async_handshake(HandshakeType::server, use_nothrow_awaitable); if (handshake_error) { - core::logging::LOG_WARN(logger_) << "Handshake with " << ssl_socket.lowest_layer().remote_endpoint() << " failed due to " << handshake_error.message(); + core::logging::LOG_WARN(logger_) << "Handshake with " << remote_address << " failed due to " << handshake_error.message(); co_return; } - co_return co_await readLoop(ssl_socket); // NOLINT + co_await readLoop(ssl_socket, remote_address, local_port); // NOLINT + + asio::error_code ec; + ssl_socket.lowest_layer().cancel(ec); + if (ec) { + logger_->log_error("Cancelling asynchronous operations of SSL socket failed with: %s", ec.message()); + } + auto [shutdown_error] = co_await ssl_socket.async_shutdown(use_nothrow_awaitable); + if (shutdown_error) { + core::logging::LOG_WARN(logger_) << "Shutdown of " << remote_address << " failed with " << shutdown_error.message(); + } } } // namespace org::apache::nifi::minifi::utils::net diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h index 1777cc9af..b3ed11854 100644 --- a/libminifi/test/Utils.h +++ b/libminifi/test/Utils.h @@ -175,7 +175,7 @@ std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents const asio::ip::tcp::endpoint& remote_endpoint, const std::filesystem::path& ca_cert_path, const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt, - asio::ssl::context::method method = asio::ssl::context::tlsv12_client) { + asio::ssl::context::method method = asio::ssl::context::tls_client) { asio::ssl::context ctx(method); ctx.load_verify_file(ca_cert_path.string()); if (ssl_data) { @@ -186,6 +186,11 @@ std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents } asio::io_context io_context; asio::ssl::stream<asio::ip::tcp::socket> socket(io_context, ctx); + auto shutdown_socket = gsl::finally([&] { + asio::error_code ec; + socket.lowest_layer().cancel(ec); + socket.shutdown(ec); + }); asio::error_code err; socket.lowest_layer().connect(remote_endpoint, err); if (err) {
