This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit bf5d3d25ba33a7e02eb9bee98767073f08e86464 Author: Martin Zink <[email protected]> AuthorDate: Wed Oct 5 16:45:25 2022 +0200 MINIFICPP-1923 Refactor PutUDP to use asio MINIFICPP-1939 Enable dual stack listening on ListenTCP and ListenSyslog PutUDP should iterate through endpoints (both ipv4 and ipv6) Closes #1412 Signed-off-by: Marton Szasz <[email protected]> --- .../standard-processors/processors/PutUDP.cpp | 94 +++++++-------- extensions/standard-processors/processors/PutUDP.h | 2 - .../tests/unit/ListenSyslogTests.cpp | 131 +++++++++++++++------ .../tests/unit/ListenTcpTests.cpp | 78 ++++++++++-- .../standard-processors/tests/unit/PutUDPTests.cpp | 118 ++++++++++--------- .../include/utils/net/SessionHandlingServer.h | 2 +- libminifi/src/utils/net/UdpServer.cpp | 2 +- libminifi/test/Utils.h | 48 ++++++-- 8 files changed, 307 insertions(+), 168 deletions(-) diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp index 95000011d..56af0f306 100644 --- a/extensions/standard-processors/processors/PutUDP.cpp +++ b/extensions/standard-processors/processors/PutUDP.cpp @@ -16,29 +16,20 @@ */ #include "PutUDP.h" -#ifdef WIN32 -#ifndef WIN32_LEAN_AND_MEAN -#define WIN32_LEAN_AND_MEAN -#endif -#include <winsock2.h> -#else -#include <netdb.h> -#endif /* WIN32 */ -#include <utility> - -#include "range/v3/view/join.hpp" #include "range/v3/range/conversion.hpp" #include "utils/gsl.h" #include "utils/expected.h" -#include "utils/net/DNS.h" -#include "utils/net/Socket.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/PropertyBuilder.h" #include "core/Resource.h" #include "core/logging/LoggerConfiguration.h" +#include "asio/ip/udp.hpp" + +using asio::ip::udp; + namespace org::apache::nifi::minifi::processors { const core::Property PutUDP::Hostname = core::PropertyBuilder::createProperty("Hostname") @@ -107,51 +98,54 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons return; } - const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) -> std::string { - return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa); }).value_or("(n/a)"); + asio::io_context io_context; + + const auto resolve_hostname = [&io_context, &hostname, &port]() -> nonstd::expected<udp::resolver::results_type, std::error_code> { + udp::resolver resolver(io_context); + std::error_code error_code; + auto results = resolver.resolve(hostname, port, error_code); + if (error_code) + return nonstd::make_unexpected(error_code); + return results; }; - const auto debug_log_resolved_names = [&, this](const addrinfo& names) -> decltype(auto) { - if (logger_->should_log(core::logging::LOG_LEVEL::debug)) { - std::vector<std::string> names_vector; - for (const addrinfo* it = &names; it; it = it->ai_next) { - names_vector.push_back(nonthrowing_sockaddr_ntop(it->ai_addr)); + const auto send_data_to_endpoint = [&io_context, &data, &logger = this->logger_](const udp::resolver::results_type& resolved_query) -> nonstd::expected<void, std::error_code> { + std::error_code error; + for (const auto& resolver_entry : resolved_query) { + error.clear(); + udp::socket socket(io_context); + socket.open(resolver_entry.endpoint().protocol(), error); + if (error) { + logger->log_debug("opening %s socket failed due to %s ", resolver_entry.endpoint().protocol() == udp::v4() ? "IPv4" : "IPv6", error.message()); + continue; } - logger_->log_debug("resolved \'%s\' to: %s", - hostname, - names_vector | ranges::views::join(',') | ranges::to<std::string>()); + socket.send_to(asio::buffer(data.buffer), resolver_entry.endpoint(), udp::socket::message_flags{}, error); + if (error) { + core::logging::LOG_DEBUG(logger) << "sending to endpoint " << resolver_entry.endpoint() << " failed due to " << error.message(); + continue; + } + core::logging::LOG_DEBUG(logger) << "sending to endpoint " << resolver_entry.endpoint() << " succeeded"; + return {}; } - return names; + return nonstd::make_unexpected(error); + }; + + const auto transfer_to_success = [&session, &flow_file]() -> void { + session->transfer(flow_file, Success); }; - utils::net::resolveHost(hostname.c_str(), port.c_str(), utils::net::IpProtocol::UDP) - | utils::map(utils::dereference) - | utils::map(debug_log_resolved_names) - | utils::flatMap([](const auto& names) { return utils::net::open_socket(names); }) - | utils::flatMap([&, this](utils::net::OpenSocketResult socket_handle_and_selected_name) -> nonstd::expected<void, std::error_code> { - const auto& [socket_handle, selected_name] = socket_handle_and_selected_name; - logger_->log_debug("connected to %s", nonthrowing_sockaddr_ntop(selected_name->ai_addr)); -#ifdef WIN32 - const char* const buffer_ptr = reinterpret_cast<const char*>(data.buffer.data()); -#else - const void* const buffer_ptr = data.buffer.data(); -#endif - const auto send_result = ::sendto(socket_handle.get(), buffer_ptr, data.buffer.size(), 0, selected_name->ai_addr, selected_name->ai_addrlen); - logger_->log_trace("sendto returned %ld", static_cast<long>(send_result)); // NOLINT: sendto - if (send_result == utils::net::SocketError) { - return nonstd::make_unexpected(utils::net::get_last_socket_error()); - } - session->transfer(flow_file, Success); - return {}; - }) - | utils::orElse([&, this](std::error_code ec) { - gsl_Expects(ec); - logger_->log_error("%s", ec.message()); - session->transfer(flow_file, Failure); - }); + const auto transfer_to_failure = [&session, &flow_file, &logger = this->logger_](std::error_code ec) -> void { + gsl_Expects(ec); + logger->log_error("%s", ec.message()); + session->transfer(flow_file, Failure); + }; + + resolve_hostname() + | utils::flatMap(send_data_to_endpoint) + | utils::map(transfer_to_success) + | utils::orElse(transfer_to_failure); } REGISTER_RESOURCE(PutUDP, Processor); } // namespace org::apache::nifi::minifi::processors - diff --git a/extensions/standard-processors/processors/PutUDP.h b/extensions/standard-processors/processors/PutUDP.h index 5217519fb..cf69fce3d 100644 --- a/extensions/standard-processors/processors/PutUDP.h +++ b/extensions/standard-processors/processors/PutUDP.h @@ -58,8 +58,6 @@ class PutUDP final : public core::Processor { void onTrigger(core::ProcessContext*, core::ProcessSession*) final; private: - std::string hostname_; - std::string port_; std::shared_ptr<core::logging::Logger> logger_; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp index c9b8e472e..302af6c3a 100644 --- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp +++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp @@ -21,6 +21,7 @@ #include "SingleProcessorTestController.h" #include "Utils.h" #include "controllers/SSLContextService.h" +#include "range/v3/algorithm/contains.hpp" using ListenSyslog = org::apache::nifi::minifi::processors::ListenSyslog; @@ -29,6 +30,7 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::test { constexpr uint64_t SYSLOG_PORT = 10255; +constexpr auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"}; struct ValidRFC5424Message { constexpr ValidRFC5424Message(std::string_view message, @@ -197,21 +199,10 @@ constexpr std::string_view rfc5424_logger_example_1 = R"(<13>1 2022-03-17T10:10: constexpr std::string_view invalid_syslog = "not syslog"; -void sendUDPPacket(const std::string_view content, uint64_t port) { - asio::io_context io_context; - asio::ip::udp::socket socket(io_context); - asio::ip::udp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port); - socket.open(asio::ip::udp::v4()); - std::error_code err; - socket.send_to(asio::buffer(content, content.size()), remote_endpoint, 0, err); - REQUIRE(!err); - socket.close(); -} - void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, std::string_view protocol) { CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port")); CHECK(protocol == flow_file.getAttribute("syslog.protocol")); - CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender")); + CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender"))); CHECK(std::nullopt == flow_file.getAttribute("syslog.valid")); CHECK(std::nullopt == flow_file.getAttribute("syslog.priority")); @@ -228,7 +219,7 @@ void check_for_only_basic_attributes(core::FlowFile& flow_file, uint16_t port, s void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC5424Message& original_message, uint16_t port, std::string_view protocol) { CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port")); CHECK(protocol == flow_file.getAttribute("syslog.protocol")); - CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender")); + CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender"))); CHECK("true" == flow_file.getAttribute("syslog.valid")); CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority")); @@ -247,7 +238,7 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC5424 void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164Message& original_message, uint16_t port, std::string_view protocol) { CHECK(std::to_string(port) == flow_file.getAttribute("syslog.port")); CHECK(protocol == flow_file.getAttribute("syslog.protocol")); - CHECK("127.0.0.1" == flow_file.getAttribute("syslog.sender")); + CHECK(ranges::contains(local_addresses, flow_file.getAttribute("syslog.sender"))); CHECK("true" == flow_file.getAttribute("syslog.valid")); CHECK(original_message.priority_ == flow_file.getAttribute("syslog.priority")); @@ -269,19 +260,37 @@ TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog]") { std::string protocol; SECTION("UDP") { + asio::ip::udp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT); + } protocol = "UDP"; REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP")); controller.plan->scheduleProcessor(listen_syslog); - sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT); - sendUDPPacket(invalid_syslog, SYSLOG_PORT); + utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint); + utils::sendUdpDatagram(invalid_syslog, endpoint); } SECTION("TCP") { + asio::ip::tcp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT); + } protocol = "TCP"; REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP")); controller.plan->scheduleProcessor(listen_syslog); - REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT)); - REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT)); + REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint)); + REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint)); } std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result; REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms)); @@ -303,25 +312,43 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") { std::string protocol; SECTION("UDP") { + asio::ip::udp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT); + } protocol = "UDP"; REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP")); controller.plan->scheduleProcessor(listen_syslog); std::this_thread::sleep_for(100ms); - sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT); - sendUDPPacket(rfc5424_doc_example_2.unparsed_, SYSLOG_PORT); - sendUDPPacket(rfc5424_doc_example_3.unparsed_, SYSLOG_PORT); - sendUDPPacket(rfc5424_doc_example_4.unparsed_, SYSLOG_PORT); - - sendUDPPacket(rfc3164_doc_example_1.unparsed_, SYSLOG_PORT); - sendUDPPacket(rfc3164_doc_example_2.unparsed_, SYSLOG_PORT); - sendUDPPacket(rfc3164_doc_example_3.unparsed_, SYSLOG_PORT); - sendUDPPacket(rfc3164_doc_example_4.unparsed_, SYSLOG_PORT); - - sendUDPPacket(rfc5424_logger_example_1, SYSLOG_PORT); - sendUDPPacket(invalid_syslog, SYSLOG_PORT); + utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint); + utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, endpoint); + utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, endpoint); + utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, endpoint); + + utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, endpoint); + utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, endpoint); + utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, endpoint); + utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, endpoint); + + utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint); + utils::sendUdpDatagram(invalid_syslog, endpoint); } SECTION("TCP") { + asio::ip::tcp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT); + } protocol = "TCP"; REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP")); controller.plan->scheduleProcessor(listen_syslog); @@ -329,15 +356,15 @@ TEST_CASE("ListenSyslog with parsing test", "[ListenSyslog]") { REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_, rfc5424_doc_example_2.unparsed_, rfc5424_doc_example_3.unparsed_, - rfc5424_doc_example_4.unparsed_}, SYSLOG_PORT)); + rfc5424_doc_example_4.unparsed_}, endpoint)); REQUIRE(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_, rfc3164_doc_example_2.unparsed_, rfc3164_doc_example_3.unparsed_, - rfc3164_doc_example_4.unparsed_}, SYSLOG_PORT)); + rfc3164_doc_example_4.unparsed_}, endpoint)); - REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, SYSLOG_PORT)); - REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, SYSLOG_PORT)); + REQUIRE(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint)); + REQUIRE(utils::sendMessagesViaTCP({invalid_syslog}, endpoint)); } std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result; @@ -410,19 +437,37 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") { LogTestController::getInstance().setWarn<ListenSyslog>(); SECTION("UDP") { + asio::ip::udp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::udp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT); + } REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "UDP")); controller.plan->scheduleProcessor(listen_syslog); for (auto i = 0; i < 100; ++i) { - sendUDPPacket(rfc5424_doc_example_1.unparsed_, SYSLOG_PORT); + utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, endpoint); } CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 50, 300ms, 50ms)); } SECTION("TCP") { + asio::ip::tcp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT); + } REQUIRE(listen_syslog->setProperty(ListenSyslog::ProtocolProperty, "TCP")); controller.plan->scheduleProcessor(listen_syslog); for (auto i = 0; i < 100; ++i) { - REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, SYSLOG_PORT)); + REQUIRE(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, endpoint)); } CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms)); } @@ -435,6 +480,15 @@ TEST_CASE("ListenSyslog max queue and max batch size test", "[ListenSyslog]") { } TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") { + asio::ip::tcp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), SYSLOG_PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), SYSLOG_PORT); + } const auto listen_syslog = std::make_shared<ListenSyslog>("ListenSyslog"); SingleProcessorTestController controller{listen_syslog}; @@ -454,9 +508,8 @@ TEST_CASE("Test ListenSyslog via TCP with SSL connection", "[ListenSyslog]") { REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, "SSLContextService")); ssl_context_service->enable(); controller.plan->scheduleProcessor(listen_syslog); - REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); - REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, SYSLOG_PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt"))); - + REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); + REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt"))); std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>> result; REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 50ms)); CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == rfc5424_logger_example_1); diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp index 559db0c2f..247f2e6ff 100644 --- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp +++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp @@ -21,6 +21,7 @@ #include "SingleProcessorTestController.h" #include "Utils.h" #include "controllers/SSLContextService.h" +#include "range/v3/algorithm/contains.hpp" using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP; @@ -32,10 +33,20 @@ constexpr uint64_t PORT = 10254; void check_for_attributes(core::FlowFile& flow_file) { CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port")); - CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender")); + const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"}; + CHECK(ranges::contains(local_addresses, flow_file.getAttribute("tcp.sender"))); } TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") { + asio::ip::tcp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT); + } const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP"); SingleProcessorTestController controller{listen_tcp}; @@ -44,8 +55,8 @@ TEST_CASE("ListenTCP test multiple messages", "[ListenTCP]") { REQUIRE(listen_tcp->setProperty(ListenTCP::MaxBatchSize, "2")); controller.plan->scheduleProcessor(listen_tcp); - REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, PORT)); - REQUIRE(utils::sendMessagesViaTCP({"another_message"}, PORT)); + REQUIRE(utils::sendMessagesViaTCP({"test_message_1"}, endpoint)); + REQUIRE(utils::sendMessagesViaTCP({"another_message"}, endpoint)); ProcessorTriggerResult result; REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 50ms)); CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == "test_message_1"); @@ -68,6 +79,15 @@ TEST_CASE("ListenTCP can be rescheduled", "[ListenTCP]") { } TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") { + asio::ip::tcp::endpoint endpoint; + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT); + } const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP"); SingleProcessorTestController controller{listen_tcp}; @@ -79,7 +99,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", "[ListenTCP]") { controller.plan->scheduleProcessor(listen_tcp); for (auto i = 0; i < 100; ++i) { - REQUIRE(utils::sendMessagesViaTCP({"test_message"}, PORT)); + REQUIRE(utils::sendMessagesViaTCP({"test_message"}, endpoint)); } CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 50, 300ms, 50ms)); @@ -110,27 +130,61 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") { REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::SSLContextService.getName(), "SSLContextService")); std::vector<std::string> expected_successful_messages; + asio::ip::tcp::endpoint endpoint; + SECTION("Without client certificate verification") { SECTION("Client certificate not required, Client Auth set to NONE by default") { + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT); + } } SECTION("Client certificate not required, but validated if provided") { REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT")); + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT); + } } ssl_context_service->enable(); controller.plan->scheduleProcessor(listen_tcp); expected_successful_messages = {"test_message_1", "another_message"}; - for (const auto& message : expected_successful_messages) { - REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); + for (const auto& message: expected_successful_messages) { + REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt"))); } } SECTION("With client certificate provided") { SECTION("Client certificate required") { REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED")); + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT); + } } SECTION("Client certificate not required but validated") { REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "WANT")); + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT); + } } ssl_context_service->enable(); controller.plan->scheduleProcessor(listen_tcp); @@ -143,16 +197,24 @@ TEST_CASE("Test ListenTCP with SSL connection", "[ListenTCP]") { expected_successful_messages = {"test_message_1", "another_message"}; for (const auto& message : expected_successful_messages) { - REQUIRE(utils::sendMessagesViaSSL({message}, PORT, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data)); + REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_cert.crt", ssl_data)); } } SECTION("Required certificate not provided") { + SECTION("sending through IPv4", "[IPv4]") { + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), PORT); + } + SECTION("sending through IPv6", "[IPv6]") { + if (utils::isIPv6Disabled()) + return; + endpoint = asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), PORT); + } REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::ClientAuth.getName(), "REQUIRED")); ssl_context_service->enable(); controller.plan->scheduleProcessor(listen_tcp); - REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, PORT, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt"))); + REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt"))); } ProcessorTriggerResult result; diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp index 9e8e95247..fd313d928 100644 --- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp +++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp @@ -24,78 +24,59 @@ #include "Catch.h" #include "PutUDP.h" #include "core/ProcessContext.h" -#include "utils/net/DNS.h" -#include "utils/net/Socket.h" +#include "utils/net/UdpServer.h" #include "utils/expected.h" #include "utils/StringUtils.h" +using namespace std::literals::chrono_literals; + namespace org::apache::nifi::minifi::processors { namespace { -struct DatagramListener { - DatagramListener(const char* const hostname, const char* const port) - :resolved_names_{utils::net::resolveHost(hostname, port, utils::net::IpProtocol::UDP).value()}, - open_socket_{utils::net::open_socket(*resolved_names_) - | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", hostname, " on port ", port)}; })} - { - const auto bind_result = bind(open_socket_.socket_.get(), open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen); - if (bind_result == utils::net::SocketError) { - throw std::runtime_error{utils::StringUtils::join_pack("bind: ", utils::net::get_last_socket_error().message())}; - } - } - - struct ReceiveResult { - std::string remote_address; - std::string message; - }; - - [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) const { - ReceiveResult result; - result.message.resize(max_message_size); - sockaddr_storage remote_address{}; - socklen_t addrlen = sizeof(remote_address); - const auto recv_result = recvfrom(open_socket_.socket_.get(), result.message.data(), result.message.size(), 0, std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen); - if (recv_result == utils::net::SocketError) { - throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", utils::net::get_last_socket_error().message())}; - } - result.message.resize(gsl::narrow<size_t>(recv_result)); - result.remote_address = utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address))); - return result; +std::optional<utils::net::Message> tryDequeueWithTimeout(utils::net::UdpServer& listener, std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 10ms) { + auto start_time = std::chrono::system_clock::now(); + utils::net::Message result; + while (start_time + timeout > std::chrono::system_clock::now()) { + if (listener.tryDequeue(result)) + return result; + std::this_thread::sleep_for(interval); } - - std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_; - utils::net::OpenSocketResult open_socket_; -}; + return std::nullopt; +} } // namespace -// Testing the failure relationship is not required, because since UDP in general without guarantees, flow files are always routed to success, unless there is -// some weird IO error with the content repo. TEST_CASE("PutUDP", "[putudp]") { - const auto putudp = std::make_shared<PutUDP>("PutUDP"); + const auto put_udp = std::make_shared<PutUDP>("PutUDP"); auto random_engine = std::mt19937{std::random_device{}()}; // NOLINT: "Missing space before { [whitespace/braces] [5]" // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding to those const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 1}(random_engine); - const auto port_str = std::to_string(port); - test::SingleProcessorTestController controller{putudp}; + test::SingleProcessorTestController controller{put_udp}; LogTestController::getInstance().setTrace<PutUDP>(); LogTestController::getInstance().setTrace<core::ProcessContext>(); - LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, "org::apache::nifi::minifi::core::ProcessContextExpr"); - putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}"); - putudp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", port_str, "')}")); + put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}"); + put_udp->setProperty(PutUDP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}")); - DatagramListener listener{"localhost", port_str.c_str()}; + utils::net::UdpServer listener{std::nullopt, port, core::logging::LoggerFactory<utils::net::UdpServer>().getLogger()}; + + auto server_thread = std::thread([&listener]() { listener.run(); }); + auto cleanup_server = gsl::finally([&]{ + listener.stop(); + server_thread.join(); + }); { const char* const message = "first message: hello"; const auto result = controller.trigger(message); const auto& success_flow_files = result.at(PutUDP::Success); REQUIRE(success_flow_files.size() == 1); - REQUIRE(result.at(PutUDP::Failure).empty()); - REQUIRE(controller.plan->getContent(success_flow_files[0]) == message); - auto receive_result = listener.receive(); - REQUIRE(receive_result.message == message); - REQUIRE(!receive_result.remote_address.empty()); + CHECK(result.at(PutUDP::Failure).empty()); + CHECK(controller.plan->getContent(success_flow_files[0]) == message); + auto received_message = tryDequeueWithTimeout(listener); + REQUIRE(received_message); + CHECK(received_message->message_data == message); + CHECK(received_message->protocol == utils::net::IpProtocol::UDP); + CHECK(!received_message->sender_address.to_string().empty()); } { @@ -103,12 +84,39 @@ TEST_CASE("PutUDP", "[putudp]") { const auto result = controller.trigger(message); const auto& success_flow_files = result.at(PutUDP::Success); REQUIRE(success_flow_files.size() == 1); - REQUIRE(result.at(PutUDP::Failure).empty()); - REQUIRE(controller.plan->getContent(success_flow_files[0]) == message); - auto receive_result = listener.receive(); - REQUIRE(receive_result.message == message); - REQUIRE(!receive_result.remote_address.empty()); + CHECK(result.at(PutUDP::Failure).empty()); + CHECK(controller.plan->getContent(success_flow_files[0]) == message); + auto received_message = tryDequeueWithTimeout(listener); + REQUIRE(received_message); + CHECK(received_message->message_data == message); + CHECK(received_message->protocol == utils::net::IpProtocol::UDP); + CHECK(!received_message->sender_address.to_string().empty()); } -} + { + LogTestController::getInstance().clear(); + auto message = std::string(65536, 'a'); + const auto result = controller.trigger(message); + const auto& failure_flow_files = result.at(PutUDP::Failure); + REQUIRE(failure_flow_files.size() == 1); + CHECK(result.at(PutUDP::Success).empty()); + CHECK(controller.plan->getContent(failure_flow_files[0]) == message); + CHECK((LogTestController::getInstance().contains("Message too long") + || LogTestController::getInstance().contains("A message sent on a datagram socket was larger than the internal message buffer"))); + } + + { + LogTestController::getInstance().clear(); + const char* const message = "message for invalid host"; + controller.plan->setProperty(put_udp, PutUDP::Hostname.getName(), "invalid_hostname"); + const auto result = controller.trigger(message); + const auto& failure_flow_files = result.at(PutUDP::Failure); + auto received_message = tryDequeueWithTimeout(listener); + CHECK(!received_message); + REQUIRE(failure_flow_files.size() == 1); + CHECK(result.at(PutUDP::Success).empty()); + CHECK(controller.plan->getContent(failure_flow_files[0]) == message); + CHECK((LogTestController::getInstance().contains("Host not found") || LogTestController::getInstance().contains("No such host is known"))); + } +} } // namespace org::apache::nifi::minifi::processors diff --git a/libminifi/include/utils/net/SessionHandlingServer.h b/libminifi/include/utils/net/SessionHandlingServer.h index f716a8758..173fdcb02 100644 --- a/libminifi/include/utils/net/SessionHandlingServer.h +++ b/libminifi/include/utils/net/SessionHandlingServer.h @@ -29,7 +29,7 @@ class SessionHandlingServer : public Server { public: SessionHandlingServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger) : Server(max_queue_size, std::move(logger)), - acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) { + acceptor_(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port)) { } void run() override { diff --git a/libminifi/src/utils/net/UdpServer.cpp b/libminifi/src/utils/net/UdpServer.cpp index 490a081e7..6137f2ddf 100644 --- a/libminifi/src/utils/net/UdpServer.cpp +++ b/libminifi/src/utils/net/UdpServer.cpp @@ -22,7 +22,7 @@ UdpServer::UdpServer(std::optional<size_t> max_queue_size, uint16_t port, std::shared_ptr<core::logging::Logger> logger) : Server(max_queue_size, std::move(logger)), - socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v4(), port)) { + socket_(io_context_, asio::ip::udp::endpoint(asio::ip::udp::v6(), port)) { doReceive(); } diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h index 96389f740..76d8cc533 100644 --- a/libminifi/test/Utils.h +++ b/libminifi/test/Utils.h @@ -111,24 +111,47 @@ bool countLogOccurrencesUntil(const std::string& pattern, return false; } -bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, uint64_t port) { +bool sendMessagesViaTCP(const std::vector<std::string_view>& contents, const asio::ip::tcp::endpoint& remote_endpoint) { asio::io_context io_context; asio::ip::tcp::socket socket(io_context); - asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port); socket.connect(remote_endpoint); std::error_code err; for (auto& content : contents) { std::string tcp_message(content); tcp_message += '\n'; asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err); + if (err) { + return false; + } } - if (err) { - return false; - } - socket.close(); return true; } +bool sendUdpDatagram(const asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) { + asio::io_context io_context; + asio::ip::udp::socket socket(io_context); + socket.open(remote_endpoint.protocol()); + std::error_code err; + socket.send_to(content, remote_endpoint, 0, err); + return !err; +} + +bool sendUdpDatagram(const gsl::span<std::byte const> content, const asio::ip::udp::endpoint& remote_endpoint) { + return sendUdpDatagram(asio::const_buffer(content.begin(), content.size()), remote_endpoint); +} + +bool sendUdpDatagram(const std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) { + return sendUdpDatagram(asio::buffer(content), remote_endpoint); +} + +bool isIPv6Disabled() { + asio::io_context io_context; + std::error_code error_code; + asio::ip::tcp::socket socket_tcp(io_context); + socket_tcp.connect(asio::ip::tcp::endpoint(asio::ip::address_v6::loopback(), 10), error_code); + return error_code.value() == EADDRNOTAVAIL; +} + struct ConnectionTestAccessor { FIELD_ACCESSOR(queue_); }; @@ -143,7 +166,10 @@ struct FlowFileQueueTestAccessor { FIELD_ACCESSOR(queue_); }; -bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t port, const std::string& ca_cert_path, const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) { +bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, + const asio::ip::tcp::endpoint& remote_endpoint, + const std::string& ca_cert_path, + const std::optional<minifi::utils::net::SslData>& ssl_data = std::nullopt) { asio::ssl::context ctx(asio::ssl::context::sslv23); ctx.load_verify_file(ca_cert_path); if (ssl_data) { @@ -154,7 +180,6 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t } asio::io_context io_context; asio::ssl::stream<asio::ip::tcp::socket> socket(io_context, ctx); - asio::ip::tcp::endpoint remote_endpoint(asio::ip::address::from_string("127.0.0.1"), port); asio::error_code err; socket.lowest_layer().connect(remote_endpoint, err); if (err) { @@ -168,11 +193,10 @@ bool sendMessagesViaSSL(const std::vector<std::string_view>& contents, uint64_t std::string tcp_message(content); tcp_message += '\n'; asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err); + if (err) { + return false; + } } - if (err) { - return false; - } - socket.lowest_layer().close(); return true; }
