martinzink commented on code in PR #1412:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1412#discussion_r958121582
##########
extensions/standard-processors/tests/unit/PutUDPTests.cpp:
##########
@@ -24,91 +24,86 @@
#include "Catch.h"
#include "PutUDP.h"
#include "core/ProcessContext.h"
-#include "utils/net/DNS.h"
-#include "utils/net/Socket.h"
+#include "utils/net/UdpServer.h"
#include "utils/expected.h"
#include "utils/StringUtils.h"
+using namespace std::literals::chrono_literals;
+
namespace org::apache::nifi::minifi::processors {
namespace {
-struct DatagramListener {
- DatagramListener(const char* const hostname, const char* const port)
- :resolved_names_{utils::net::resolveHost(hostname, port,
utils::net::IpProtocol::UDP).value()},
- open_socket_{utils::net::open_socket(*resolved_names_)
- | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw
std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ",
hostname, " on port ", port)}; })}
- {
- const auto bind_result = bind(open_socket_.socket_.get(),
open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
- if (bind_result == utils::net::SocketError) {
- throw std::runtime_error{utils::StringUtils::join_pack("bind: ",
utils::net::get_last_socket_error().message())};
- }
- }
-
- struct ReceiveResult {
- std::string remote_address;
- std::string message;
- };
-
- [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192)
const {
- ReceiveResult result;
- result.message.resize(max_message_size);
- sockaddr_storage remote_address{};
- socklen_t addrlen = sizeof(remote_address);
- const auto recv_result = recvfrom(open_socket_.socket_.get(),
result.message.data(), result.message.size(), 0,
std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
- if (recv_result == utils::net::SocketError) {
- throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ",
utils::net::get_last_socket_error().message())};
- }
- result.message.resize(gsl::narrow<size_t>(recv_result));
- result.remote_address =
utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
- return result;
+std::optional<utils::net::Message>
tryDequeueWithTimeout(utils::net::UdpServer& listener,
std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval =
10ms) {
+ auto start_time = std::chrono::system_clock::now();
+ utils::net::Message result;
+ while (start_time + timeout > std::chrono::system_clock::now()) {
+ if (listener.tryDequeue(result))
+ return result;
+ std::this_thread::sleep_for(interval);
}
-
- std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
- utils::net::OpenSocketResult open_socket_;
-};
+ return std::nullopt;
+}
} // namespace
-// Testing the failure relationship is not required, because since UDP in
general without guarantees, flow files are always routed to success, unless
there is
-// some weird IO error with the content repo.
Review Comment:
This was not strictly true, because the hostname resolution could fail
(added a test-case for that).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]