This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit b6d0eb137bd63e4d5b287a932bea8df9cc41dc48 Author: Marton Szasz <[email protected]> AuthorDate: Thu Jan 8 05:32:26 2026 +0100 MINIFICPP-2701 ListenUDP udp.sender.port property - added tryDequeue overload with optional return type to MinifiConcurrentQueue - renamed port / server_port to remote and local - added remote port to Message, adapted all the dependent files - added udp.sender.port output attribute to ListenUDP - extended test utility to capture the local endpoint after UDP send - extended ListenUDPTests to test that the new property is set appropriately Closes #2084 Signed-off-by: Ferenc Gerlits <[email protected]> --- PROCESSORS.md | 9 ++++--- .../include/utils/MinifiConcurrentQueue.h | 15 ++++++++--- extension-framework/include/utils/net/Message.h | 15 +++++------ extension-framework/include/utils/net/Server.h | 8 ++++-- extension-framework/include/utils/net/TcpServer.h | 6 ++--- extension-framework/include/utils/net/UdpServer.h | 2 -- extension-framework/src/utils/net/TcpServer.cpp | 29 ++++++++++++---------- extension-framework/src/utils/net/UdpServer.cpp | 9 +++---- .../standard-processors/processors/GetTCP.cpp | 20 +++++++++------ extensions/standard-processors/processors/GetTCP.h | 7 ++++-- .../processors/ListenSyslog.cpp | 4 +-- .../standard-processors/processors/ListenTCP.cpp | 4 +-- .../standard-processors/processors/ListenUDP.cpp | 5 ++-- .../standard-processors/processors/ListenUDP.h | 5 ++-- .../processors/NetworkListenerProcessor.cpp | 9 ++++--- .../processors/NetworkListenerProcessor.h | 8 +++--- .../tests/unit/ListenSyslogTests.cpp | 26 +++++++++---------- .../tests/unit/ListenUDPTests.cpp | 24 ++++++++++-------- .../standard-processors/tests/unit/PutTCPTests.cpp | 10 ++++---- .../standard-processors/tests/unit/PutUDPTests.cpp | 7 +++--- libminifi/test/libtest/unit/TestUtils.cpp | 16 +++++++----- libminifi/test/libtest/unit/TestUtils.h | 7 +++--- 22 files changed, 140 insertions(+), 105 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 1fc8af238..7851a06af 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -1565,10 +1565,11 @@ In the list below, the names of required properties appear in bold. Any other pr ### Output Attributes -| Attribute | Relationship | Description | -|------------|--------------|----------------------------------------------| -| udp.port | | The sending port the messages were received. | -| udp.sender | | The sending host of the messages. | +| Attribute | Relationship | Description | +|-----------------|--------------|---------------------------------------------------------| +| udp.port | | The listening port on which the messages were received. | +| udp.sender | | The sending host of the messages. | +| udp.sender.port | | The sending port of the messages. | ## ListFile diff --git a/core-framework/include/utils/MinifiConcurrentQueue.h b/core-framework/include/utils/MinifiConcurrentQueue.h index 490cc601a..04dead821 100644 --- a/core-framework/include/utils/MinifiConcurrentQueue.h +++ b/core-framework/include/utils/MinifiConcurrentQueue.h @@ -17,15 +17,17 @@ #pragma once #include <algorithm> +#include <atomic> #include <chrono> +#include <condition_variable> #include <deque> #include <mutex> -#include <condition_variable> -#include <utility> +#include <optional> #include <stdexcept> -#include <atomic> +#include <utility> #include "utils/TryMoveCall.h" +#include "minifi-cpp/utils/gsl.h" namespace org::apache::nifi::minifi::utils { @@ -56,6 +58,13 @@ class ConcurrentQueue { return tryDequeueImpl(lck, out); } + std::optional<T> tryDequeue() { + std::optional<T> result = std::nullopt; + const bool consume_result = consume([&result](T value) { result = std::move(value); }); + gsl_Assert(consume_result == result.has_value()); + return result; + } + template<typename Functor> bool consume(Functor&& fun) { std::unique_lock<std::mutex> lck(mtx_); diff --git a/extension-framework/include/utils/net/Message.h b/extension-framework/include/utils/net/Message.h index 2cc05f717..499a1f5aa 100644 --- a/extension-framework/include/utils/net/Message.h +++ b/extension-framework/include/utils/net/Message.h @@ -25,20 +25,21 @@ namespace org::apache::nifi::minifi::utils::net { struct Message { - public: - Message() = default; - Message(std::string message_data, IpProtocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port) + Message() = delete; + Message(std::string message_data, IpProtocol protocol, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port) : message_data(std::move(message_data)), protocol(protocol), - server_port(server_port), - sender_address(std::move(sender_address)) { + remote_address(std::move(remote_address)), + remote_port(remote_port), + local_port(local_port) { } bool is_partial = false; std::string message_data; IpProtocol protocol; - asio::ip::port_type server_port; - asio::ip::address sender_address; + asio::ip::address remote_address; + asio::ip::port_type remote_port; + asio::ip::port_type local_port; }; } // namespace org::apache::nifi::minifi::utils::net diff --git a/extension-framework/include/utils/net/Server.h b/extension-framework/include/utils/net/Server.h index bda53c7fa..34d6d89a8 100644 --- a/extension-framework/include/utils/net/Server.h +++ b/extension-framework/include/utils/net/Server.h @@ -47,9 +47,13 @@ class Server { bool queueEmpty() { return concurrent_queue_.empty(); } - bool tryDequeue(utils::net::Message& received_message) { - return concurrent_queue_.tryDequeue(received_message); + std::optional<utils::net::Message> tryDequeue() { + return concurrent_queue_.tryDequeue(); } + Server(const Server&) = delete; + Server(Server&&) = delete; + Server& operator=(const Server&) = delete; + Server& operator=(Server&&) = delete; virtual ~Server() { stop(); } diff --git a/extension-framework/include/utils/net/TcpServer.h b/extension-framework/include/utils/net/TcpServer.h index 00f4146d3..0301e6a3e 100644 --- a/extension-framework/include/utils/net/TcpServer.h +++ b/extension-framework/include/utils/net/TcpServer.h @@ -41,10 +41,10 @@ class TcpServer : public Server { protected: asio::awaitable<void> doReceive() override; - asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port); - asio::awaitable<void> secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port); + asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port); + asio::awaitable<void> secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port); - asio::awaitable<void> readLoop(auto& socket, const auto& remote_address, const auto& local_port); + asio::awaitable<void> readLoop(auto& socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port); bool consume_delimiter_; const std::string delimiter_; diff --git a/extension-framework/include/utils/net/UdpServer.h b/extension-framework/include/utils/net/UdpServer.h index 4fb2dcb93..c84dc01d9 100644 --- a/extension-framework/include/utils/net/UdpServer.h +++ b/extension-framework/include/utils/net/UdpServer.h @@ -18,11 +18,9 @@ #include <optional> #include <memory> -#include <string> #include <asio/awaitable.hpp> #include "Server.h" -#include "utils/MinifiConcurrentQueue.h" #include "minifi-cpp/core/logging/Logger.h" #include "core/logging/LoggerFactory.h" diff --git a/extension-framework/src/utils/net/TcpServer.cpp b/extension-framework/src/utils/net/TcpServer.cpp index 1c51fa265..abcf6b827 100644 --- a/extension-framework/src/utils/net/TcpServer.cpp +++ b/extension-framework/src/utils/net/TcpServer.cpp @@ -34,20 +34,23 @@ asio::awaitable<void> TcpServer::doReceive() { continue; } std::error_code error; - auto remote_address = socket.lowest_layer().remote_endpoint(error).address(); - if (error) + auto remote_endpoint = socket.lowest_layer().remote_endpoint(error); + if (error) { logger_->log_warn("Error during fetching remote endpoint: {}", error.message()); + } auto local_port = socket.lowest_layer().local_endpoint(error).port(); - if (error) + if (error) { logger_->log_warn("Error during fetching local endpoint: {}", error.message()); - if (ssl_data_) - co_spawn(io_context_, secureSession(std::move(socket), std::move(remote_address), local_port), asio::detached); - else - co_spawn(io_context_, insecureSession(std::move(socket), std::move(remote_address), local_port), asio::detached); + } + if (ssl_data_) { + co_spawn(io_context_, secureSession(std::move(socket), remote_endpoint.address(), remote_endpoint.port(), local_port), asio::detached); + } else { + co_spawn(io_context_, insecureSession(std::move(socket), remote_endpoint.address(), remote_endpoint.port(), local_port), asio::detached); + } } } -asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& remote_address, const auto& local_port) { +asio::awaitable<void> TcpServer::readLoop(auto& socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port) { std::string read_message; while (true) { auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), delimiter_, use_nothrow_awaitable); // NOLINT @@ -65,7 +68,7 @@ asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& remote_addre if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) { auto message_str = read_message.substr(0, bytes_read - (consume_delimiter_ ? delimiter_.size() : 0)); - concurrent_queue_.enqueue(Message(std::move(message_str), IpProtocol::TCP, remote_address, local_port)); + concurrent_queue_.enqueue(Message(std::move(message_str), IpProtocol::TCP, remote_address, remote_port, local_port)); } else { logger_->log_warn("Queue is full. TCP message ignored."); } @@ -73,8 +76,8 @@ asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& remote_addre } } -asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port) { - co_return co_await readLoop(socket, remote_address, local_port); // NOLINT +asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port) { + co_return co_await readLoop(socket, remote_address, remote_port, local_port); // NOLINT } namespace { @@ -95,7 +98,7 @@ asio::ssl::context setupSslContext(SslServerOptions& ssl_data) { } } // namespace -asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port) { +asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port) { gsl_Expects(ssl_data_); auto ssl_context = setupSslContext(*ssl_data_); SslSocket ssl_socket(std::move(socket), ssl_context); @@ -104,7 +107,7 @@ asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, asi logger_->log_warn("Handshake with {} failed due to {}", remote_address, handshake_error.message()); co_return; } - co_await readLoop(ssl_socket, remote_address, local_port); // NOLINT + co_await readLoop(ssl_socket, remote_address, remote_port, local_port); asio::error_code ec; std::ignore = ssl_socket.lowest_layer().cancel(ec); diff --git a/extension-framework/src/utils/net/UdpServer.cpp b/extension-framework/src/utils/net/UdpServer.cpp index de0713b3f..35f0933d7 100644 --- a/extension-framework/src/utils/net/UdpServer.cpp +++ b/extension-framework/src/utils/net/UdpServer.cpp @@ -15,8 +15,6 @@ * limitations under the License. */ #include "utils/net/UdpServer.h" -#include "asio/use_awaitable.hpp" -#include "asio/detached.hpp" #include "utils/net/AsioCoro.h" namespace org::apache::nifi::minifi::utils::net { @@ -43,10 +41,11 @@ asio::awaitable<void> UdpServer::doReceive() { continue; } buffer.resize(bytes_received); - if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) - concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), IpProtocol::UDP, sender_endpoint.address(), socket.local_endpoint().port())); - else + if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) { + concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), IpProtocol::UDP, sender_endpoint.address(), sender_endpoint.port(), socket.local_endpoint().port())); + } else { logger_->log_warn("Queue is full. UDP message ignored."); + } } } diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp index 5e1ef25ec..ee515662b 100644 --- a/extensions/standard-processors/processors/GetTCP.cpp +++ b/extensions/standard-processors/processors/GetTCP.cpp @@ -117,7 +117,7 @@ void GetTCP::notifyStop() { void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) { auto flow_file = session.create(); session.writeBuffer(flow_file, message.message_data); - flow_file->setAttribute(GetTCP::SourceEndpoint.name, fmt::format("{}:{}", message.sender_address.to_string(), std::to_string(message.server_port))); + flow_file->setAttribute(GetTCP::SourceEndpoint.name, fmt::format("{}:{}", message.remote_address.to_string(), std::to_string(message.remote_port))); if (message.is_partial) session.transfer(flow_file, Partial); else @@ -128,11 +128,12 @@ void GetTCP::onTrigger(core::ProcessContext&, core::ProcessSession& session) { gsl_Expects(max_batch_size_ > 0); size_t logs_processed = 0; while (!client_->queueEmpty() && logs_processed < max_batch_size_) { - utils::net::Message received_message; - if (!client_->tryDequeue(received_message)) + if (const auto received_message = client_->tryDequeue()) { + transferAsFlowFile(received_message.value(), session); + ++logs_processed; + } else { break; - transferAsFlowFile(received_message, session); - ++logs_processed; + } } } @@ -175,8 +176,8 @@ bool GetTCP::TcpClient::queueEmpty() const { return concurrent_queue_.empty(); } -bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) { - return concurrent_queue_.tryDequeue(received_message); +std::optional<utils::net::Message> GetTCP::TcpClient::tryDequeue() { + return concurrent_queue_.tryDequeue(); } asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) { @@ -203,7 +204,10 @@ asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) { continue; if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) { - utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().remote_endpoint().port()}; + const auto remote_endpoint = socket.lowest_layer().remote_endpoint(); + const auto local_endpoint = socket.lowest_layer().local_endpoint(); + utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, remote_endpoint.address(), + remote_endpoint.port(), local_endpoint.port()}; if (previous_didnt_end_with_delimiter || current_doesnt_end_with_delimiter) message.is_partial = true; concurrent_queue_.enqueue(std::move(message)); diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h index 4d91b9cd3..7cdfb6496 100644 --- a/extensions/standard-processors/processors/GetTCP.h +++ b/extensions/standard-processors/processors/GetTCP.h @@ -147,14 +147,17 @@ class GetTCP : public core::ProcessorImpl { std::optional<size_t> max_message_size, std::vector<utils::net::ConnectionId> connections, std::shared_ptr<core::logging::Logger> logger); - + TcpClient(const TcpClient&) = delete; + TcpClient(TcpClient&&) = delete; + TcpClient& operator=(const TcpClient&) = delete; + TcpClient& operator=(TcpClient&&) = delete; ~TcpClient(); void run(); void stop(); bool queueEmpty() const; - bool tryDequeue(utils::net::Message& received_message); + std::optional<utils::net::Message> tryDequeue(); private: asio::awaitable<void> doReceiveFrom(const utils::net::ConnectionId& connection_id); diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp index 83aeae98d..2ef226627 100644 --- a/extensions/standard-processors/processors/ListenSyslog.cpp +++ b/extensions/standard-processors/processors/ListenSyslog.cpp @@ -95,8 +95,8 @@ void ListenSyslog::transferAsFlowFile(const utils::net::Message& message, core:: session.writeBuffer(flow_file, message.message_data); flow_file->setAttribute("syslog.protocol", std::string{magic_enum::enum_name(message.protocol)}); - flow_file->setAttribute("syslog.port", std::to_string(message.server_port)); - flow_file->setAttribute("syslog.sender", message.sender_address.to_string()); + flow_file->setAttribute("syslog.port", std::to_string(message.local_port)); + flow_file->setAttribute("syslog.sender", message.remote_address.to_string()); session.transfer(flow_file, valid ? Success : Invalid); } diff --git a/extensions/standard-processors/processors/ListenTCP.cpp b/extensions/standard-processors/processors/ListenTCP.cpp index cc5d2beef..af2fe50ed 100644 --- a/extensions/standard-processors/processors/ListenTCP.cpp +++ b/extensions/standard-processors/processors/ListenTCP.cpp @@ -43,8 +43,8 @@ void ListenTCP::onSchedule(core::ProcessContext& context, core::ProcessSessionFa void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) { auto flow_file = session.create(); session.writeBuffer(flow_file, message.message_data); - flow_file->setAttribute("tcp.port", std::to_string(message.server_port)); - flow_file->setAttribute("tcp.sender", message.sender_address.to_string()); + flow_file->setAttribute("tcp.port", std::to_string(message.local_port)); + flow_file->setAttribute("tcp.sender", message.remote_address.to_string()); session.transfer(flow_file, Success); } diff --git a/extensions/standard-processors/processors/ListenUDP.cpp b/extensions/standard-processors/processors/ListenUDP.cpp index d917ddf0d..dcc441bea 100644 --- a/extensions/standard-processors/processors/ListenUDP.cpp +++ b/extensions/standard-processors/processors/ListenUDP.cpp @@ -34,8 +34,9 @@ void ListenUDP::onSchedule(core::ProcessContext& context, core::ProcessSessionFa void ListenUDP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) { auto flow_file = session.create(); session.writeBuffer(flow_file, message.message_data); - flow_file->setAttribute("udp.port", std::to_string(message.server_port)); - flow_file->setAttribute("udp.sender", message.sender_address.to_string()); + flow_file->setAttribute(ListeningPort.name, std::to_string(message.local_port)); + flow_file->setAttribute(SenderPort.name, std::to_string(message.remote_port)); + flow_file->setAttribute(Sender.name, message.remote_address.to_string()); session.transfer(flow_file, Success); } diff --git a/extensions/standard-processors/processors/ListenUDP.h b/extensions/standard-processors/processors/ListenUDP.h index b6a706ec9..32ad06405 100644 --- a/extensions/standard-processors/processors/ListenUDP.h +++ b/extensions/standard-processors/processors/ListenUDP.h @@ -63,9 +63,10 @@ class ListenUDP : public NetworkListenerProcessor { EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Messages received successfully will be sent out this relationship."}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; - EXTENSIONAPI static constexpr auto PortOutputAttribute = core::OutputAttributeDefinition<0>{"udp.port", {}, "The sending port the messages were received."}; + EXTENSIONAPI static constexpr auto ListeningPort = core::OutputAttributeDefinition<0>{"udp.port", {}, "The listening port on which the messages were received."}; EXTENSIONAPI static constexpr auto Sender = core::OutputAttributeDefinition<0>{"udp.sender", {}, "The sending host of the messages."}; - EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{PortOutputAttribute, Sender}; + EXTENSIONAPI static constexpr auto SenderPort = core::OutputAttributeDefinition<0>{"udp.sender.port", {}, "The sending port of the messages."}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::to_array<core::OutputAttributeReference>({ListeningPort, Sender, SenderPort}); void initialize() override; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp index 94b869879..40621a98b 100644 --- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp +++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp @@ -30,11 +30,12 @@ void NetworkListenerProcessor::onTrigger(core::ProcessContext&, core::ProcessSes gsl_Expects(max_batch_size_ > 0); size_t logs_processed = 0; while (!server_->queueEmpty() && logs_processed < max_batch_size_) { - utils::net::Message received_message; - if (!server_->tryDequeue(received_message)) + if (const auto received_message = server_->tryDequeue()) { + transferAsFlowFile(received_message.value(), session); + ++logs_processed; + } else { break; - transferAsFlowFile(received_message, session); - ++logs_processed; + } } } diff --git a/extensions/standard-processors/processors/NetworkListenerProcessor.h b/extensions/standard-processors/processors/NetworkListenerProcessor.h index fc9d4d7eb..ed2ed550c 100644 --- a/extensions/standard-processors/processors/NetworkListenerProcessor.h +++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h @@ -18,13 +18,11 @@ #include <memory> #include <string> -#include <utility> +#include <thread> #include "core/ProcessorImpl.h" -#include "minifi-cpp/core/logging/Logger.h" #include "minifi-cpp/core/ProcessContext.h" #include "core/ProcessSession.h" -#include "minifi-cpp/core/Property.h" #include "utils/net/Server.h" namespace org::apache::nifi::minifi::processors { @@ -32,6 +30,10 @@ namespace org::apache::nifi::minifi::processors { class NetworkListenerProcessor : public core::ProcessorImpl { public: using ProcessorImpl::ProcessorImpl; + NetworkListenerProcessor(const NetworkListenerProcessor&) = delete; + NetworkListenerProcessor(NetworkListenerProcessor&&) = delete; + NetworkListenerProcessor& operator=(const NetworkListenerProcessor&) = delete; + NetworkListenerProcessor& operator=(NetworkListenerProcessor&&) = delete; ~NetworkListenerProcessor() override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp index a8dfc1465..b23f954ea 100644 --- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp +++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp @@ -273,8 +273,8 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") { endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port); } protocol = "UDP"; - CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess()); + CHECK(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(invalid_syslog, endpoint).has_value()); } SECTION("TCP") { @@ -329,18 +329,18 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog][NetworkListenerProce endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port); } - CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint), MatchesSuccess()); + CHECK(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint).has_value()); - CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint), MatchesSuccess()); + CHECK(utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint).has_value()); - CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), MatchesSuccess()); + CHECK(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint).has_value()); + CHECK(utils::sendUdpDatagram(invalid_syslog, endpoint).has_value()); } SECTION("TCP") { @@ -456,7 +456,7 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog][Netw endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port); } for (auto i = 0; i < 100; ++i) { - CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint), MatchesSuccess()); + CHECK(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint).has_value()); } CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms)); } diff --git a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp index 10144c63c..c3f4126b2 100644 --- a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp +++ b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp @@ -29,10 +29,11 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::test { -void check_for_attributes(core::FlowFile& flow_file, uint16_t port) { +void check_for_attributes(core::FlowFile& flow_file, uint16_t server_port, uint16_t remote_port) { const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"}; - CHECK(std::to_string(port) == flow_file.getAttribute("udp.port")); + CHECK(std::to_string(server_port) == flow_file.getAttribute("udp.port")); CHECK(ranges::contains(local_addresses, flow_file.getAttribute("udp.sender"))); + CHECK(std::to_string(remote_port) == flow_file.getAttribute("udp.sender.port")); } TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProcessor]") { @@ -43,7 +44,6 @@ TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProces REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize.name, "2")); auto port = utils::scheduleProcessorOnRandomPort(controller.plan, listen_udp); - asio::ip::udp::endpoint endpoint; SECTION("sending through IPv6", "[IPv6]") { if (utils::isIPv6Disabled()) @@ -55,16 +55,20 @@ TEST_CASE("ListenUDP test multiple messages", "[ListenUDP][NetworkListenerProces } controller.plan->scheduleProcessor(listen_udp); - CHECK_THAT(utils::sendUdpDatagram({"test_message_1"}, endpoint), MatchesSuccess()); - CHECK_THAT(utils::sendUdpDatagram({"another_message"}, endpoint), MatchesSuccess()); + const auto test_message_1_result = utils::sendUdpDatagram({"test_message_1"}, endpoint); + CHECK(test_message_1_result.has_value()); + const auto another_message_result = utils::sendUdpDatagram({"another_message"}, endpoint); + CHECK(another_message_result.has_value()); ProcessorTriggerResult result; REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 50ms)); CHECK(result.at(ListenUDP::Success).size() == 2); - CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[0]) == "test_message_1"); - CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[1]) == "another_message"); + const auto test_message_1_flow_file = result.at(ListenUDP::Success)[0]; + CHECK(controller.plan->getContent(test_message_1_flow_file) == "test_message_1"); + const auto another_message_flow_file = result.at(ListenUDP::Success)[1]; + CHECK(controller.plan->getContent(another_message_flow_file) == "another_message"); - check_for_attributes(*result.at(ListenUDP::Success)[0], port); - check_for_attributes(*result.at(ListenUDP::Success)[1], port); + check_for_attributes(*test_message_1_flow_file, port, test_message_1_result.value().port()); + check_for_attributes(*another_message_flow_file, port, another_message_result.value().port()); } TEST_CASE("ListenUDP can be rescheduled", "[ListenUDP][NetworkListenerProcessor]") { @@ -101,7 +105,7 @@ TEST_CASE("ListenUDP max queue and max batch size test", "[ListenUDP][NetworkLis controller.plan->scheduleProcessor(listen_udp); for (auto i = 0; i < 100; ++i) { - CHECK_THAT(utils::sendUdpDatagram({"test_message"}, endpoint), MatchesSuccess()); + CHECK(utils::sendUdpDatagram({"test_message"}, endpoint).has_value()); } CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms)); diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp index 469a1a728..5e1e8a359 100644 --- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp +++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp @@ -67,12 +67,13 @@ class CancellableTcpServer : public utils::net::TcpServer { } std::error_code error; auto remote_address = socket.lowest_layer().remote_endpoint(error).address(); + auto remote_port = socket.lowest_layer().remote_endpoint(error).port(); auto cancellable_timer = std::make_shared<asio::steady_timer>(io_context_); cancellable_timers_.push_back(cancellable_timer); if (ssl_data_) - co_spawn(io_context_, secureSession(std::move(socket), std::move(remote_address), port_) || wait_until_cancelled(cancellable_timer), asio::detached); + co_spawn(io_context_, secureSession(std::move(socket), std::move(remote_address), remote_port, port_) || wait_until_cancelled(cancellable_timer), asio::detached); else - co_spawn(io_context_, insecureSession(std::move(socket), std::move(remote_address), port_) || wait_until_cancelled(cancellable_timer), asio::detached); + co_spawn(io_context_, insecureSession(std::move(socket), std::move(remote_address), remote_port, port_) || wait_until_cancelled(cancellable_timer), asio::detached); } } @@ -158,9 +159,8 @@ class PutTCPTestFixture { auto interval = 10ms; auto start_time = std::chrono::system_clock::now(); - utils::net::Message result; while (start_time + timeout > std::chrono::system_clock::now()) { - if (getServer(port)->tryDequeue(result)) + if (const auto result = getServer(port)->tryDequeue()) return result; std::this_thread::sleep_for(interval); } @@ -274,7 +274,7 @@ void receive_success(PutTCPTestFixture& test_fixture, const std::string_view exp if (received_message) { CHECK(received_message->message_data == expected_message); CHECK(received_message->protocol == utils::net::IpProtocol::TCP); - CHECK(!received_message->sender_address.to_string().empty()); + CHECK(!received_message->remote_address.to_string().empty()); } } diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp index 361252aa4..48d035f04 100644 --- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp +++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp @@ -36,9 +36,8 @@ namespace org::apache::nifi::minifi::processors { namespace { std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer& listener, std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 10ms) { auto start_time = std::chrono::system_clock::now(); - utils::net::Message result; while (start_time + timeout > std::chrono::system_clock::now()) { - if (listener.tryDequeue(result)) + if (const auto result = listener.tryDequeue()) return result; std::this_thread::sleep_for(interval); } @@ -80,7 +79,7 @@ TEST_CASE("PutUDP", "[putudp]") { REQUIRE(received_message); CHECK(received_message->message_data == message); CHECK(received_message->protocol == utils::net::IpProtocol::UDP); - CHECK(!received_message->sender_address.to_string().empty()); + CHECK(!received_message->remote_address.to_string().empty()); } { @@ -94,7 +93,7 @@ TEST_CASE("PutUDP", "[putudp]") { REQUIRE(received_message); CHECK(received_message->message_data == message); CHECK(received_message->protocol == utils::net::IpProtocol::UDP); - CHECK(!received_message->sender_address.to_string().empty()); + CHECK(!received_message->remote_address.to_string().empty()); } { diff --git a/libminifi/test/libtest/unit/TestUtils.cpp b/libminifi/test/libtest/unit/TestUtils.cpp index b479281e8..c56f44f75 100644 --- a/libminifi/test/libtest/unit/TestUtils.cpp +++ b/libminifi/test/libtest/unit/TestUtils.cpp @@ -188,22 +188,26 @@ std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& contents return {}; } -std::error_code sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) { +std::expected<asio::ip::udp::endpoint, std::error_code> sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) { asio::io_context io_context; asio::ip::udp::socket socket(io_context); std::error_code err; std::ignore = socket.open(remote_endpoint.protocol(), err); - if (err) - return err; + if (err) { + return std::unexpected{err}; + } socket.send_to(content, remote_endpoint, 0, err); - return err; + if (err) { + return std::unexpected{err}; + } + return socket.local_endpoint(); } -std::error_code sendUdpDatagram(const std::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) { +std::expected<asio::ip::udp::endpoint, std::error_code> sendUdpDatagram(const std::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) { return sendUdpDatagram(asio::const_buffer(content.data(), content.size()), remote_endpoint); } -std::error_code sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) { +std::expected<asio::ip::udp::endpoint, std::error_code> sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) { return sendUdpDatagram(asio::buffer(content), remote_endpoint); } diff --git a/libminifi/test/libtest/unit/TestUtils.h b/libminifi/test/libtest/unit/TestUtils.h index d2ca10d83..64af75c7c 100644 --- a/libminifi/test/libtest/unit/TestUtils.h +++ b/libminifi/test/libtest/unit/TestUtils.h @@ -170,11 +170,12 @@ bool countLogOccurrencesUntil(const std::string& pattern, const size_t occurrences, const std::chrono::milliseconds max_duration, const std::chrono::milliseconds wait_time = 50ms); + std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint, const std::optional<std::string_view> delimiter = std::nullopt); -std::error_code sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint); +std::expected<asio::ip::udp::endpoint /* local */, std::error_code> sendUdpDatagram(asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint); -std::error_code sendUdpDatagram(const std::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint); -std::error_code sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint); +std::expected<asio::ip::udp::endpoint /* local */, std::error_code> sendUdpDatagram(std::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint); +std::expected<asio::ip::udp::endpoint /* local */, std::error_code> sendUdpDatagram(std::string_view content, const asio::ip::udp::endpoint& remote_endpoint); bool isIPv6Disabled();
