martinzink commented on code in PR #1412:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1412#discussion_r968314083


##########
extensions/standard-processors/tests/unit/PutUDPTests.cpp:
##########
@@ -24,91 +24,87 @@
 #include "Catch.h"
 #include "PutUDP.h"
 #include "core/ProcessContext.h"
-#include "utils/net/DNS.h"
-#include "utils/net/Socket.h"
+#include "utils/net/UdpServer.h"
 #include "utils/expected.h"
 #include "utils/StringUtils.h"
 
+using namespace std::literals::chrono_literals;
+
 namespace org::apache::nifi::minifi::processors {
 
 namespace {
-struct DatagramListener {
-  DatagramListener(const char* const hostname, const char* const port)
-    :resolved_names_{utils::net::resolveHost(hostname, port, 
utils::net::IpProtocol::UDP).value()},
-     open_socket_{utils::net::open_socket(*resolved_names_)
-        | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw 
std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", 
hostname, " on port ", port)}; })}
-  {
-    const auto bind_result = bind(open_socket_.socket_.get(), 
open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
-    if (bind_result == utils::net::SocketError) {
-      throw std::runtime_error{utils::StringUtils::join_pack("bind: ", 
utils::net::get_last_socket_error().message())};
-    }
+std::optional<utils::net::Message> 
tryDequeueWithTimeout(utils::net::UdpServer& listener, 
std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 
10ms) {
+  auto start_time = std::chrono::system_clock::now();
+  utils::net::Message result;
+  while (start_time + timeout > std::chrono::system_clock::now()) {
+    if (listener.tryDequeue(result))
+      return result;
+    std::this_thread::sleep_for(interval);
   }
-
-  struct ReceiveResult {
-    std::string remote_address;
-    std::string message;
-  };
-
-  [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) 
const {
-    ReceiveResult result;
-    result.message.resize(max_message_size);
-    sockaddr_storage remote_address{};
-    socklen_t addrlen = sizeof(remote_address);
-    const auto recv_result = recvfrom(open_socket_.socket_.get(), 
result.message.data(), result.message.size(), 0, 
std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
-    if (recv_result == utils::net::SocketError) {
-      throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", 
utils::net::get_last_socket_error().message())};
-    }
-    result.message.resize(gsl::narrow<size_t>(recv_result));
-    result.remote_address = 
utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
-    return result;
-  }
-
-  std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
-  utils::net::OpenSocketResult open_socket_;
-};
+  return std::nullopt;
+}
 }  // namespace
 
-// Testing the failure relationship is not required, because since UDP in 
general without guarantees, flow files are always routed to success, unless 
there is
-// some weird IO error with the content repo.
 TEST_CASE("PutUDP", "[putudp]") {
-  const auto putudp = std::make_shared<PutUDP>("PutUDP");
+  const auto put_udp = std::make_shared<PutUDP>("PutUDP");
   auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: 
"Missing space before {  [whitespace/braces] [5]"
   // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding 
to those
   const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 
1}(random_engine);
-  const auto port_str = std::to_string(port);
 
-  test::SingleProcessorTestController controller{putudp};
+  test::SingleProcessorTestController controller{put_udp};
   LogTestController::getInstance().setTrace<PutUDP>();
   LogTestController::getInstance().setTrace<core::ProcessContext>();
   LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, 
"org::apache::nifi::minifi::core::ProcessContextExpr");
-  putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
-  putudp->setProperty(PutUDP::Port, 
utils::StringUtils::join_pack("${literal('", port_str, "')}"));
+  put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
+  put_udp->setProperty(PutUDP::Port, 
utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  DatagramListener listener{"localhost", port_str.c_str()};
+  utils::net::UdpServer listener{std::nullopt, port, 
core::logging::LoggerFactory<utils::net::UdpServer>().getLogger()};
+
+  auto server_thread = std::thread([&listener]() { listener.run(); });
+  auto cleanup_server = gsl::finally([&]{
+    listener.stop();
+    server_thread.join();
+  });
 
   {
     const char* const message = "first message: hello";
     const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
-    REQUIRE(result.at(PutUDP::Failure).empty());
-    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
-    auto receive_result = listener.receive();
-    REQUIRE(receive_result.message == message);
-    REQUIRE(!receive_result.remote_address.empty());
+    CHECK(result.at(PutUDP::Failure).empty());
+    CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+    auto received_message = tryDequeueWithTimeout(listener);
+    REQUIRE(received_message);
+    CHECK(received_message->message_data == message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+    CHECK(!received_message->sender_address.to_string().empty());
   }
 
   {
     const char* const message = "longer message 
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
  // NOLINT
     const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
-    REQUIRE(result.at(PutUDP::Failure).empty());
-    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
-    auto receive_result = listener.receive();
-    REQUIRE(receive_result.message == message);
-    REQUIRE(!receive_result.remote_address.empty());
+    CHECK(result.at(PutUDP::Failure).empty());
+    CHECK(controller.plan->getContent(success_flow_files[0]) == message);
+    auto received_message = tryDequeueWithTimeout(listener);
+    REQUIRE(received_message);
+    CHECK(received_message->message_data == message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
+    CHECK(!received_message->sender_address.to_string().empty());
   }
-}
 
+  {
+    const char* const message = "message for invalid host";
+    controller.plan->setProperty(put_udp, PutUDP::Hostname.getName(), 
"invalid_hostname");
+    const auto result = controller.trigger(message);
+    const auto& failure_flow_files = result.at(PutUDP::Failure);
+    auto received_message = tryDequeueWithTimeout(listener);
+    CHECK(!received_message);
+    REQUIRE(failure_flow_files.size() == 1);
+    CHECK(result.at(PutUDP::Success).empty());
+    CHECK(controller.plan->getContent(failure_flow_files[0]) == message);
+    CHECK((LogTestController::getInstance().contains("Host not found") || 
LogTestController::getInstance().contains("No such host is known")));

Review Comment:
   good idea, changed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1412/commits/45948030831865d2a71d6a47135a2f6deac823d0#diff-f690439939269c49549e72231c05edf2e8ba912ec3fe206031d6465b7d161779R98



##########
extensions/standard-processors/processors/PutUDP.cpp:
##########
@@ -107,48 +98,48 @@ void PutUDP::onTrigger(core::ProcessContext* context, 
core::ProcessSession* cons
     return;
   }
 
-  const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) -> 
std::string {
-    return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa); 
}).value_or("(n/a)");
+  asio::io_context io_context;
+
+  const auto resolve_hostname = [&io_context, &hostname, &port]() -> 
nonstd::expected<udp::resolver::results_type, std::error_code> {
+    udp::resolver resolver(io_context);
+    std::error_code error_code;
+    auto resolved_query = resolver.resolve(udp::v4(), hostname, port, 
error_code);
+    if (error_code)
+      return nonstd::make_unexpected(error_code);
+    return resolved_query;
+  };
+
+  const auto debug_log_resolved_endpoint = [&hostname, &logger = 
this->logger_](const udp::resolver::results_type& resolved_query) -> 
udp::endpoint {
+    if (logger->should_log(core::logging::LOG_LEVEL::debug))
+      core::logging::LOG_WARN(logger) << "resolved " << hostname << " to: " << 
resolved_query->endpoint();

Review Comment:
   you are right, fixed it 
https://github.com/apache/nifi-minifi-cpp/pull/1412/commits/45948030831865d2a71d6a47135a2f6deac823d0#diff-e126536bed52648bd8b876e5a90622a3ebfc00207b7c0b10ca7f1f9605e67c0eR114



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to