This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit b6d0eb137bd63e4d5b287a932bea8df9cc41dc48
Author: Marton Szasz <[email protected]>
AuthorDate: Thu Jan 8 05:32:26 2026 +0100

    MINIFICPP-2701 ListenUDP udp.sender.port property
    
    - added tryDequeue overload with optional return type to
      MinifiConcurrentQueue
    - renamed port / server_port to remote and local
    - added remote port to Message, adapted all the dependent files
    - added udp.sender.port output attribute to ListenUDP
    - extended test utility to capture the local endpoint after UDP send
    - extended ListenUDPTests to test that the new property is set
      appropriately
    
    Closes #2084
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
---
 PROCESSORS.md                                      |  9 ++++---
 .../include/utils/MinifiConcurrentQueue.h          | 15 ++++++++---
 extension-framework/include/utils/net/Message.h    | 15 +++++------
 extension-framework/include/utils/net/Server.h     |  8 ++++--
 extension-framework/include/utils/net/TcpServer.h  |  6 ++---
 extension-framework/include/utils/net/UdpServer.h  |  2 --
 extension-framework/src/utils/net/TcpServer.cpp    | 29 ++++++++++++----------
 extension-framework/src/utils/net/UdpServer.cpp    |  9 +++----
 .../standard-processors/processors/GetTCP.cpp      | 20 +++++++++------
 extensions/standard-processors/processors/GetTCP.h |  7 ++++--
 .../processors/ListenSyslog.cpp                    |  4 +--
 .../standard-processors/processors/ListenTCP.cpp   |  4 +--
 .../standard-processors/processors/ListenUDP.cpp   |  5 ++--
 .../standard-processors/processors/ListenUDP.h     |  5 ++--
 .../processors/NetworkListenerProcessor.cpp        |  9 ++++---
 .../processors/NetworkListenerProcessor.h          |  8 +++---
 .../tests/unit/ListenSyslogTests.cpp               | 26 +++++++++----------
 .../tests/unit/ListenUDPTests.cpp                  | 24 ++++++++++--------
 .../standard-processors/tests/unit/PutTCPTests.cpp | 10 ++++----
 .../standard-processors/tests/unit/PutUDPTests.cpp |  7 +++---
 libminifi/test/libtest/unit/TestUtils.cpp          | 16 +++++++-----
 libminifi/test/libtest/unit/TestUtils.h            |  7 +++---
 22 files changed, 140 insertions(+), 105 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 1fc8af238..7851a06af 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1565,10 +1565,11 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 ### Output Attributes
 
-| Attribute  | Relationship | Description                                  |
-|------------|--------------|----------------------------------------------|
-| udp.port   |              | The sending port the messages were received. |
-| udp.sender |              | The sending host of the messages.            |
+| Attribute       | Relationship | Description                                 
            |
+|-----------------|--------------|---------------------------------------------------------|
+| udp.port        |              | The listening port on which the messages 
were received. |
+| udp.sender      |              | The sending host of the messages.           
            |
+| udp.sender.port |              | The sending port of the messages.           
            |
 
 
 ## ListFile
diff --git a/core-framework/include/utils/MinifiConcurrentQueue.h 
b/core-framework/include/utils/MinifiConcurrentQueue.h
index 490cc601a..04dead821 100644
--- a/core-framework/include/utils/MinifiConcurrentQueue.h
+++ b/core-framework/include/utils/MinifiConcurrentQueue.h
@@ -17,15 +17,17 @@
 #pragma once
 
 #include <algorithm>
+#include <atomic>
 #include <chrono>
+#include <condition_variable>
 #include <deque>
 #include <mutex>
-#include <condition_variable>
-#include <utility>
+#include <optional>
 #include <stdexcept>
-#include <atomic>
+#include <utility>
 
 #include "utils/TryMoveCall.h"
+#include "minifi-cpp/utils/gsl.h"
 
 namespace org::apache::nifi::minifi::utils {
 
@@ -56,6 +58,13 @@ class ConcurrentQueue {
     return tryDequeueImpl(lck, out);
   }
 
+  std::optional<T> tryDequeue() {
+    std::optional<T> result = std::nullopt;
+    const bool consume_result = consume([&result](T value) { result = 
std::move(value); });
+    gsl_Assert(consume_result == result.has_value());
+    return result;
+  }
+
   template<typename Functor>
   bool consume(Functor&& fun) {
     std::unique_lock<std::mutex> lck(mtx_);
diff --git a/extension-framework/include/utils/net/Message.h 
b/extension-framework/include/utils/net/Message.h
index 2cc05f717..499a1f5aa 100644
--- a/extension-framework/include/utils/net/Message.h
+++ b/extension-framework/include/utils/net/Message.h
@@ -25,20 +25,21 @@
 namespace org::apache::nifi::minifi::utils::net {
 
 struct Message {
- public:
-  Message() = default;
-  Message(std::string message_data, IpProtocol protocol, asio::ip::address 
sender_address, asio::ip::port_type server_port)
+  Message() = delete;
+  Message(std::string message_data, IpProtocol protocol, asio::ip::address 
remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port)
       : message_data(std::move(message_data)),
       protocol(protocol),
-      server_port(server_port),
-      sender_address(std::move(sender_address)) {
+      remote_address(std::move(remote_address)),
+      remote_port(remote_port),
+      local_port(local_port) {
   }
 
   bool is_partial = false;
   std::string message_data;
   IpProtocol protocol;
-  asio::ip::port_type server_port;
-  asio::ip::address sender_address;
+  asio::ip::address remote_address;
+  asio::ip::port_type remote_port;
+  asio::ip::port_type local_port;
 };
 
 }  // namespace org::apache::nifi::minifi::utils::net
diff --git a/extension-framework/include/utils/net/Server.h 
b/extension-framework/include/utils/net/Server.h
index bda53c7fa..34d6d89a8 100644
--- a/extension-framework/include/utils/net/Server.h
+++ b/extension-framework/include/utils/net/Server.h
@@ -47,9 +47,13 @@ class Server {
   bool queueEmpty() {
     return concurrent_queue_.empty();
   }
-  bool tryDequeue(utils::net::Message& received_message) {
-    return concurrent_queue_.tryDequeue(received_message);
+  std::optional<utils::net::Message> tryDequeue() {
+    return concurrent_queue_.tryDequeue();
   }
+  Server(const Server&) = delete;
+  Server(Server&&) = delete;
+  Server& operator=(const Server&) = delete;
+  Server& operator=(Server&&) = delete;
   virtual ~Server() {
     stop();
   }
diff --git a/extension-framework/include/utils/net/TcpServer.h 
b/extension-framework/include/utils/net/TcpServer.h
index 00f4146d3..0301e6a3e 100644
--- a/extension-framework/include/utils/net/TcpServer.h
+++ b/extension-framework/include/utils/net/TcpServer.h
@@ -41,10 +41,10 @@ class TcpServer : public Server {
  protected:
   asio::awaitable<void> doReceive() override;
 
-  asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type local_port);
-  asio::awaitable<void> secureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type local_port);
+  asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type remote_port, 
asio::ip::port_type local_port);
+  asio::awaitable<void> secureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type remote_port, 
asio::ip::port_type local_port);
 
-  asio::awaitable<void> readLoop(auto& socket, const auto& remote_address, 
const auto& local_port);
+  asio::awaitable<void> readLoop(auto& socket, asio::ip::address 
remote_address, asio::ip::port_type remote_port, asio::ip::port_type 
local_port);
 
   bool consume_delimiter_;
   const std::string delimiter_;
diff --git a/extension-framework/include/utils/net/UdpServer.h 
b/extension-framework/include/utils/net/UdpServer.h
index 4fb2dcb93..c84dc01d9 100644
--- a/extension-framework/include/utils/net/UdpServer.h
+++ b/extension-framework/include/utils/net/UdpServer.h
@@ -18,11 +18,9 @@
 
 #include <optional>
 #include <memory>
-#include <string>
 #include <asio/awaitable.hpp>
 
 #include "Server.h"
-#include "utils/MinifiConcurrentQueue.h"
 #include "minifi-cpp/core/logging/Logger.h"
 #include "core/logging/LoggerFactory.h"
 
diff --git a/extension-framework/src/utils/net/TcpServer.cpp 
b/extension-framework/src/utils/net/TcpServer.cpp
index 1c51fa265..abcf6b827 100644
--- a/extension-framework/src/utils/net/TcpServer.cpp
+++ b/extension-framework/src/utils/net/TcpServer.cpp
@@ -34,20 +34,23 @@ asio::awaitable<void> TcpServer::doReceive() {
       continue;
     }
     std::error_code error;
-    auto remote_address = 
socket.lowest_layer().remote_endpoint(error).address();
-    if (error)
+    auto remote_endpoint = socket.lowest_layer().remote_endpoint(error);
+    if (error) {
       logger_->log_warn("Error during fetching remote endpoint: {}", 
error.message());
+    }
     auto local_port = socket.lowest_layer().local_endpoint(error).port();
-    if (error)
+    if (error) {
       logger_->log_warn("Error during fetching local endpoint: {}", 
error.message());
-    if (ssl_data_)
-      co_spawn(io_context_, secureSession(std::move(socket), 
std::move(remote_address), local_port), asio::detached);
-    else
-      co_spawn(io_context_, insecureSession(std::move(socket), 
std::move(remote_address), local_port), asio::detached);
+    }
+    if (ssl_data_) {
+      co_spawn(io_context_, secureSession(std::move(socket), 
remote_endpoint.address(), remote_endpoint.port(), local_port), asio::detached);
+    } else {
+      co_spawn(io_context_, insecureSession(std::move(socket), 
remote_endpoint.address(), remote_endpoint.port(), local_port), asio::detached);
+    }
   }
 }
 
-asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& 
remote_address, const auto& local_port) {
+asio::awaitable<void> TcpServer::readLoop(auto& socket, asio::ip::address 
remote_address, asio::ip::port_type remote_port, asio::ip::port_type 
local_port) {
   std::string read_message;
   while (true) {
     auto [read_error, bytes_read] = co_await asio::async_read_until(socket, 
asio::dynamic_buffer(read_message), delimiter_, use_nothrow_awaitable);  // 
NOLINT
@@ -65,7 +68,7 @@ asio::awaitable<void> TcpServer::readLoop(auto& socket, const 
auto& remote_addre
 
     if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
       auto message_str = read_message.substr(0, bytes_read - 
(consume_delimiter_ ? delimiter_.size() : 0));
-      concurrent_queue_.enqueue(Message(std::move(message_str), 
IpProtocol::TCP, remote_address, local_port));
+      concurrent_queue_.enqueue(Message(std::move(message_str), 
IpProtocol::TCP, remote_address, remote_port, local_port));
     } else {
       logger_->log_warn("Queue is full. TCP message ignored.");
     }
@@ -73,8 +76,8 @@ asio::awaitable<void> TcpServer::readLoop(auto& socket, const 
auto& remote_addre
   }
 }
 
-asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type local_port) {
-  co_return co_await readLoop(socket, remote_address, local_port);  // NOLINT
+asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type remote_port, 
asio::ip::port_type local_port) {
+  co_return co_await readLoop(socket, remote_address, remote_port, 
local_port);  // NOLINT
 }
 
 namespace {
@@ -95,7 +98,7 @@ asio::ssl::context setupSslContext(SslServerOptions& 
ssl_data) {
 }
 }  // namespace
 
-asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type local_port) {
+asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, 
asio::ip::address remote_address, asio::ip::port_type remote_port, 
asio::ip::port_type local_port) {
   gsl_Expects(ssl_data_);
   auto ssl_context = setupSslContext(*ssl_data_);
   SslSocket ssl_socket(std::move(socket), ssl_context);
@@ -104,7 +107,7 @@ asio::awaitable<void> 
TcpServer::secureSession(asio::ip::tcp::socket socket, asi
     logger_->log_warn("Handshake with {} failed due to {}", remote_address, 
handshake_error.message());
     co_return;
   }
-  co_await readLoop(ssl_socket, remote_address, local_port);  // NOLINT
+  co_await readLoop(ssl_socket, remote_address, remote_port, local_port);
 
   asio::error_code ec;
   std::ignore = ssl_socket.lowest_layer().cancel(ec);
diff --git a/extension-framework/src/utils/net/UdpServer.cpp 
b/extension-framework/src/utils/net/UdpServer.cpp
index de0713b3f..35f0933d7 100644
--- a/extension-framework/src/utils/net/UdpServer.cpp
+++ b/extension-framework/src/utils/net/UdpServer.cpp
@@ -15,8 +15,6 @@
  * limitations under the License.
  */
 #include "utils/net/UdpServer.h"
-#include "asio/use_awaitable.hpp"
-#include "asio/detached.hpp"
 #include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
@@ -43,10 +41,11 @@ asio::awaitable<void> UdpServer::doReceive() {
       continue;
     }
     buffer.resize(bytes_received);
-    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
-      concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), 
IpProtocol::UDP, sender_endpoint.address(), socket.local_endpoint().port()));
-    else
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
+      concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), 
IpProtocol::UDP, sender_endpoint.address(), sender_endpoint.port(), 
socket.local_endpoint().port()));
+    } else {
       logger_->log_warn("Queue is full. UDP message ignored.");
+    }
   }
 }
 
diff --git a/extensions/standard-processors/processors/GetTCP.cpp 
b/extensions/standard-processors/processors/GetTCP.cpp
index 5e1ef25ec..ee515662b 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -117,7 +117,7 @@ void GetTCP::notifyStop() {
 void GetTCP::transferAsFlowFile(const utils::net::Message& message, 
core::ProcessSession& session) {
   auto flow_file = session.create();
   session.writeBuffer(flow_file, message.message_data);
-  flow_file->setAttribute(GetTCP::SourceEndpoint.name, fmt::format("{}:{}", 
message.sender_address.to_string(), std::to_string(message.server_port)));
+  flow_file->setAttribute(GetTCP::SourceEndpoint.name, fmt::format("{}:{}", 
message.remote_address.to_string(), std::to_string(message.remote_port)));
   if (message.is_partial)
     session.transfer(flow_file, Partial);
   else
@@ -128,11 +128,12 @@ void GetTCP::onTrigger(core::ProcessContext&, 
core::ProcessSession& session) {
   gsl_Expects(max_batch_size_ > 0);
   size_t logs_processed = 0;
   while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
-    utils::net::Message received_message;
-    if (!client_->tryDequeue(received_message))
+    if (const auto received_message = client_->tryDequeue()) {
+      transferAsFlowFile(received_message.value(), session);
+      ++logs_processed;
+    } else {
       break;
-    transferAsFlowFile(received_message, session);
-    ++logs_processed;
+    }
   }
 }
 
@@ -175,8 +176,8 @@ bool GetTCP::TcpClient::queueEmpty() const {
   return concurrent_queue_.empty();
 }
 
-bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) {
-  return concurrent_queue_.tryDequeue(received_message);
+std::optional<utils::net::Message> GetTCP::TcpClient::tryDequeue() {
+  return concurrent_queue_.tryDequeue();
 }
 
 asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
@@ -203,7 +204,10 @@ asio::awaitable<std::error_code> 
GetTCP::TcpClient::readLoop(auto& socket) {
       continue;
 
     if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
-      utils::net::Message message{read_message.substr(0, bytes_read), 
utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), 
socket.lowest_layer().remote_endpoint().port()};
+      const auto remote_endpoint = socket.lowest_layer().remote_endpoint();
+      const auto local_endpoint = socket.lowest_layer().local_endpoint();
+      utils::net::Message message{read_message.substr(0, bytes_read), 
utils::net::IpProtocol::TCP, remote_endpoint.address(),
+        remote_endpoint.port(), local_endpoint.port()};
       if (previous_didnt_end_with_delimiter || 
current_doesnt_end_with_delimiter)
         message.is_partial = true;
       concurrent_queue_.enqueue(std::move(message));
diff --git a/extensions/standard-processors/processors/GetTCP.h 
b/extensions/standard-processors/processors/GetTCP.h
index 4d91b9cd3..7cdfb6496 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -147,14 +147,17 @@ class GetTCP : public core::ProcessorImpl {
         std::optional<size_t> max_message_size,
         std::vector<utils::net::ConnectionId> connections,
         std::shared_ptr<core::logging::Logger> logger);
-
+    TcpClient(const TcpClient&) = delete;
+    TcpClient(TcpClient&&) = delete;
+    TcpClient& operator=(const TcpClient&) = delete;
+    TcpClient& operator=(TcpClient&&) = delete;
     ~TcpClient();
 
     void run();
     void stop();
 
     bool queueEmpty() const;
-    bool tryDequeue(utils::net::Message& received_message);
+    std::optional<utils::net::Message> tryDequeue();
 
    private:
     asio::awaitable<void> doReceiveFrom(const utils::net::ConnectionId& 
connection_id);
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp 
b/extensions/standard-processors/processors/ListenSyslog.cpp
index 83aeae98d..2ef226627 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -95,8 +95,8 @@ void ListenSyslog::transferAsFlowFile(const 
utils::net::Message& message, core::
 
   session.writeBuffer(flow_file, message.message_data);
   flow_file->setAttribute("syslog.protocol", 
std::string{magic_enum::enum_name(message.protocol)});
-  flow_file->setAttribute("syslog.port", std::to_string(message.server_port));
-  flow_file->setAttribute("syslog.sender", message.sender_address.to_string());
+  flow_file->setAttribute("syslog.port", std::to_string(message.local_port));
+  flow_file->setAttribute("syslog.sender", message.remote_address.to_string());
   session.transfer(flow_file, valid ? Success : Invalid);
 }
 
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp 
b/extensions/standard-processors/processors/ListenTCP.cpp
index cc5d2beef..af2fe50ed 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -43,8 +43,8 @@ void ListenTCP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFa
 void ListenTCP::transferAsFlowFile(const utils::net::Message& message, 
core::ProcessSession& session) {
   auto flow_file = session.create();
   session.writeBuffer(flow_file, message.message_data);
-  flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
-  flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
+  flow_file->setAttribute("tcp.port", std::to_string(message.local_port));
+  flow_file->setAttribute("tcp.sender", message.remote_address.to_string());
   session.transfer(flow_file, Success);
 }
 
diff --git a/extensions/standard-processors/processors/ListenUDP.cpp 
b/extensions/standard-processors/processors/ListenUDP.cpp
index d917ddf0d..dcc441bea 100644
--- a/extensions/standard-processors/processors/ListenUDP.cpp
+++ b/extensions/standard-processors/processors/ListenUDP.cpp
@@ -34,8 +34,9 @@ void ListenUDP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFa
 void ListenUDP::transferAsFlowFile(const utils::net::Message& message, 
core::ProcessSession& session) {
   auto flow_file = session.create();
   session.writeBuffer(flow_file, message.message_data);
-  flow_file->setAttribute("udp.port", std::to_string(message.server_port));
-  flow_file->setAttribute("udp.sender", message.sender_address.to_string());
+  flow_file->setAttribute(ListeningPort.name, 
std::to_string(message.local_port));
+  flow_file->setAttribute(SenderPort.name, 
std::to_string(message.remote_port));
+  flow_file->setAttribute(Sender.name, message.remote_address.to_string());
   session.transfer(flow_file, Success);
 }
 
diff --git a/extensions/standard-processors/processors/ListenUDP.h 
b/extensions/standard-processors/processors/ListenUDP.h
index b6a706ec9..32ad06405 100644
--- a/extensions/standard-processors/processors/ListenUDP.h
+++ b/extensions/standard-processors/processors/ListenUDP.h
@@ -63,9 +63,10 @@ class ListenUDP : public NetworkListenerProcessor {
   EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "Messages received successfully will be 
sent out this relationship."};
   EXTENSIONAPI static constexpr auto Relationships = std::array{Success};
 
-  EXTENSIONAPI static constexpr auto PortOutputAttribute = 
core::OutputAttributeDefinition<0>{"udp.port", {}, "The sending port the 
messages were received."};
+  EXTENSIONAPI static constexpr auto ListeningPort = 
core::OutputAttributeDefinition<0>{"udp.port", {}, "The listening port on which 
the messages were received."};
   EXTENSIONAPI static constexpr auto Sender = 
core::OutputAttributeDefinition<0>{"udp.sender", {}, "The sending host of the 
messages."};
-  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::array<core::OutputAttributeReference, 2>{PortOutputAttribute, Sender};
+  EXTENSIONAPI static constexpr auto SenderPort = 
core::OutputAttributeDefinition<0>{"udp.sender.port", {}, "The sending port of 
the messages."};
+  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::to_array<core::OutputAttributeReference>({ListeningPort, Sender, 
SenderPort});
 
   void initialize() override;
   void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
diff --git 
a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp 
b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
index 94b869879..40621a98b 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -30,11 +30,12 @@ void 
NetworkListenerProcessor::onTrigger(core::ProcessContext&, core::ProcessSes
   gsl_Expects(max_batch_size_ > 0);
   size_t logs_processed = 0;
   while (!server_->queueEmpty() && logs_processed < max_batch_size_) {
-    utils::net::Message received_message;
-    if (!server_->tryDequeue(received_message))
+    if (const auto received_message = server_->tryDequeue()) {
+      transferAsFlowFile(received_message.value(), session);
+      ++logs_processed;
+    } else {
       break;
-    transferAsFlowFile(received_message, session);
-    ++logs_processed;
+    }
   }
 }
 
diff --git 
a/extensions/standard-processors/processors/NetworkListenerProcessor.h 
b/extensions/standard-processors/processors/NetworkListenerProcessor.h
index fc9d4d7eb..ed2ed550c 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.h
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -18,13 +18,11 @@
 
 #include <memory>
 #include <string>
-#include <utility>
+#include <thread>
 
 #include "core/ProcessorImpl.h"
-#include "minifi-cpp/core/logging/Logger.h"
 #include "minifi-cpp/core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "minifi-cpp/core/Property.h"
 #include "utils/net/Server.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -32,6 +30,10 @@ namespace org::apache::nifi::minifi::processors {
 class NetworkListenerProcessor : public core::ProcessorImpl {
  public:
   using ProcessorImpl::ProcessorImpl;
+  NetworkListenerProcessor(const NetworkListenerProcessor&) = delete;
+  NetworkListenerProcessor(NetworkListenerProcessor&&) = delete;
+  NetworkListenerProcessor& operator=(const NetworkListenerProcessor&) = 
delete;
+  NetworkListenerProcessor& operator=(NetworkListenerProcessor&&) = delete;
   ~NetworkListenerProcessor() override;
 
   void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp 
b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index a8dfc1465..b23f954ea 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -273,8 +273,8 @@ TEST_CASE("ListenSyslog without parsing test", 
"[ListenSyslog]") {
       endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 
port);
     }
     protocol = "UDP";
-    CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), 
MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), 
MatchesSuccess());
+    CHECK(utils::sendUdpDatagram(rfc5424_logger_example_1, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(invalid_syslog, endpoint).has_value());
   }
 
   SECTION("TCP") {
@@ -329,18 +329,18 @@ TEST_CASE("ListenSyslog with parsing test", 
"[ListenSyslog][NetworkListenerProce
       endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 
port);
     }
 
-    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, 
endpoint), MatchesSuccess());
+    CHECK(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(rfc5424_doc_example_2.unparsed_, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(rfc5424_doc_example_3.unparsed_, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(rfc5424_doc_example_4.unparsed_, 
endpoint).has_value());
 
-    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, 
endpoint), MatchesSuccess());
+    CHECK(utils::sendUdpDatagram(rfc3164_doc_example_1.unparsed_, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(rfc3164_doc_example_2.unparsed_, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(rfc3164_doc_example_3.unparsed_, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(rfc3164_doc_example_4.unparsed_, 
endpoint).has_value());
 
-    CHECK_THAT(utils::sendUdpDatagram(rfc5424_logger_example_1, endpoint), 
MatchesSuccess());
-    CHECK_THAT(utils::sendUdpDatagram(invalid_syslog, endpoint), 
MatchesSuccess());
+    CHECK(utils::sendUdpDatagram(rfc5424_logger_example_1, 
endpoint).has_value());
+    CHECK(utils::sendUdpDatagram(invalid_syslog, endpoint).has_value());
   }
 
   SECTION("TCP") {
@@ -456,7 +456,7 @@ TEST_CASE("ListenSyslog max queue and max batch size test", 
"[ListenSyslog][Netw
       endpoint = asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), 
port);
     }
     for (auto i = 0; i < 100; ++i) {
-      CHECK_THAT(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, 
endpoint), MatchesSuccess());
+      CHECK(utils::sendUdpDatagram(rfc5424_doc_example_1.unparsed_, 
endpoint).has_value());
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message 
ignored.", 50, 300ms, 50ms));
   }
diff --git a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp 
b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
index 10144c63c..c3f4126b2 100644
--- a/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenUDPTests.cpp
@@ -29,10 +29,11 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
-void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
+void check_for_attributes(core::FlowFile& flow_file, uint16_t server_port, 
uint16_t remote_port) {
   const auto local_addresses = {"127.0.0.1", "::ffff:127.0.0.1", "::1"};
-  CHECK(std::to_string(port) == flow_file.getAttribute("udp.port"));
+  CHECK(std::to_string(server_port) == flow_file.getAttribute("udp.port"));
   CHECK(ranges::contains(local_addresses, 
flow_file.getAttribute("udp.sender")));
+  CHECK(std::to_string(remote_port) == 
flow_file.getAttribute("udp.sender.port"));
 }
 
 TEST_CASE("ListenUDP test multiple messages", 
"[ListenUDP][NetworkListenerProcessor]") {
@@ -43,7 +44,6 @@ TEST_CASE("ListenUDP test multiple messages", 
"[ListenUDP][NetworkListenerProces
   REQUIRE(listen_udp->setProperty(ListenUDP::MaxBatchSize.name, "2"));
 
   auto port = utils::scheduleProcessorOnRandomPort(controller.plan, 
listen_udp);
-
   asio::ip::udp::endpoint endpoint;
   SECTION("sending through IPv6", "[IPv6]") {
     if (utils::isIPv6Disabled())
@@ -55,16 +55,20 @@ TEST_CASE("ListenUDP test multiple messages", 
"[ListenUDP][NetworkListenerProces
   }
 
   controller.plan->scheduleProcessor(listen_udp);
-  CHECK_THAT(utils::sendUdpDatagram({"test_message_1"}, endpoint), 
MatchesSuccess());
-  CHECK_THAT(utils::sendUdpDatagram({"another_message"}, endpoint), 
MatchesSuccess());
+  const auto test_message_1_result = 
utils::sendUdpDatagram({"test_message_1"}, endpoint);
+  CHECK(test_message_1_result.has_value());
+  const auto another_message_result = 
utils::sendUdpDatagram({"another_message"}, endpoint);
+  CHECK(another_message_result.has_value());
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenUDP::Success, 2}}, result, 300ms, 
50ms));
   CHECK(result.at(ListenUDP::Success).size() == 2);
-  CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[0]) == 
"test_message_1");
-  CHECK(controller.plan->getContent(result.at(ListenUDP::Success)[1]) == 
"another_message");
+  const auto test_message_1_flow_file = result.at(ListenUDP::Success)[0];
+  CHECK(controller.plan->getContent(test_message_1_flow_file) == 
"test_message_1");
+  const auto another_message_flow_file = result.at(ListenUDP::Success)[1];
+  CHECK(controller.plan->getContent(another_message_flow_file) == 
"another_message");
 
-  check_for_attributes(*result.at(ListenUDP::Success)[0], port);
-  check_for_attributes(*result.at(ListenUDP::Success)[1], port);
+  check_for_attributes(*test_message_1_flow_file, port, 
test_message_1_result.value().port());
+  check_for_attributes(*another_message_flow_file, port, 
another_message_result.value().port());
 }
 
 TEST_CASE("ListenUDP can be rescheduled", 
"[ListenUDP][NetworkListenerProcessor]") {
@@ -101,7 +105,7 @@ TEST_CASE("ListenUDP max queue and max batch size test", 
"[ListenUDP][NetworkLis
 
   controller.plan->scheduleProcessor(listen_udp);
   for (auto i = 0; i < 100; ++i) {
-    CHECK_THAT(utils::sendUdpDatagram({"test_message"}, endpoint), 
MatchesSuccess());
+    CHECK(utils::sendUdpDatagram({"test_message"}, endpoint).has_value());
   }
 
   CHECK(utils::countLogOccurrencesUntil("Queue is full. UDP message ignored.", 
50, 300ms, 50ms));
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp 
b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index 469a1a728..5e1e8a359 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -67,12 +67,13 @@ class CancellableTcpServer : public utils::net::TcpServer {
       }
       std::error_code error;
       auto remote_address = 
socket.lowest_layer().remote_endpoint(error).address();
+      auto remote_port = socket.lowest_layer().remote_endpoint(error).port();
       auto cancellable_timer = 
std::make_shared<asio::steady_timer>(io_context_);
       cancellable_timers_.push_back(cancellable_timer);
       if (ssl_data_)
-        co_spawn(io_context_, secureSession(std::move(socket), 
std::move(remote_address), port_) || wait_until_cancelled(cancellable_timer), 
asio::detached);
+        co_spawn(io_context_, secureSession(std::move(socket), 
std::move(remote_address), remote_port, port_) || 
wait_until_cancelled(cancellable_timer), asio::detached);
       else
-        co_spawn(io_context_, insecureSession(std::move(socket), 
std::move(remote_address), port_) || wait_until_cancelled(cancellable_timer), 
asio::detached);
+        co_spawn(io_context_, insecureSession(std::move(socket), 
std::move(remote_address), remote_port, port_) || 
wait_until_cancelled(cancellable_timer), asio::detached);
     }
   }
 
@@ -158,9 +159,8 @@ class PutTCPTestFixture {
     auto interval = 10ms;
 
     auto start_time = std::chrono::system_clock::now();
-    utils::net::Message result;
     while (start_time + timeout > std::chrono::system_clock::now()) {
-      if (getServer(port)->tryDequeue(result))
+      if (const auto result = getServer(port)->tryDequeue())
         return result;
       std::this_thread::sleep_for(interval);
     }
@@ -274,7 +274,7 @@ void receive_success(PutTCPTestFixture& test_fixture, const 
std::string_view exp
   if (received_message) {
     CHECK(received_message->message_data == expected_message);
     CHECK(received_message->protocol == utils::net::IpProtocol::TCP);
-    CHECK(!received_message->sender_address.to_string().empty());
+    CHECK(!received_message->remote_address.to_string().empty());
   }
 }
 
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp 
b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index 361252aa4..48d035f04 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -36,9 +36,8 @@ namespace org::apache::nifi::minifi::processors {
 namespace {
 std::optional<utils::net::Message> 
tryDequeueWithTimeout(utils::net::UdpServer& listener, 
std::chrono::milliseconds timeout = 200ms, std::chrono::milliseconds interval = 
10ms) {
   auto start_time = std::chrono::system_clock::now();
-  utils::net::Message result;
   while (start_time + timeout > std::chrono::system_clock::now()) {
-    if (listener.tryDequeue(result))
+    if (const auto result = listener.tryDequeue())
       return result;
     std::this_thread::sleep_for(interval);
   }
@@ -80,7 +79,7 @@ TEST_CASE("PutUDP", "[putudp]") {
     REQUIRE(received_message);
     CHECK(received_message->message_data == message);
     CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
-    CHECK(!received_message->sender_address.to_string().empty());
+    CHECK(!received_message->remote_address.to_string().empty());
   }
 
   {
@@ -94,7 +93,7 @@ TEST_CASE("PutUDP", "[putudp]") {
     REQUIRE(received_message);
     CHECK(received_message->message_data == message);
     CHECK(received_message->protocol == utils::net::IpProtocol::UDP);
-    CHECK(!received_message->sender_address.to_string().empty());
+    CHECK(!received_message->remote_address.to_string().empty());
   }
 
   {
diff --git a/libminifi/test/libtest/unit/TestUtils.cpp 
b/libminifi/test/libtest/unit/TestUtils.cpp
index b479281e8..c56f44f75 100644
--- a/libminifi/test/libtest/unit/TestUtils.cpp
+++ b/libminifi/test/libtest/unit/TestUtils.cpp
@@ -188,22 +188,26 @@ std::error_code sendMessagesViaTCP(const 
std::vector<std::string_view>& contents
   return {};
 }
 
-std::error_code sendUdpDatagram(const asio::const_buffer content, const 
asio::ip::udp::endpoint& remote_endpoint) {
+std::expected<asio::ip::udp::endpoint, std::error_code> sendUdpDatagram(const 
asio::const_buffer content, const asio::ip::udp::endpoint& remote_endpoint) {
   asio::io_context io_context;
   asio::ip::udp::socket socket(io_context);
   std::error_code err;
   std::ignore = socket.open(remote_endpoint.protocol(), err);
-  if (err)
-    return err;
+  if (err) {
+    return std::unexpected{err};
+  }
   socket.send_to(content, remote_endpoint, 0, err);
-  return err;
+  if (err) {
+    return std::unexpected{err};
+  }
+  return socket.local_endpoint();
 }
 
-std::error_code sendUdpDatagram(const std::span<std::byte const> content, 
const asio::ip::udp::endpoint& remote_endpoint) {
+std::expected<asio::ip::udp::endpoint, std::error_code> sendUdpDatagram(const 
std::span<std::byte const> content, const asio::ip::udp::endpoint& 
remote_endpoint) {
   return sendUdpDatagram(asio::const_buffer(content.data(), content.size()), 
remote_endpoint);
 }
 
-std::error_code sendUdpDatagram(const std::string_view content, const 
asio::ip::udp::endpoint& remote_endpoint) {
+std::expected<asio::ip::udp::endpoint, std::error_code> sendUdpDatagram(const 
std::string_view content, const asio::ip::udp::endpoint& remote_endpoint) {
   return sendUdpDatagram(asio::buffer(content), remote_endpoint);
 }
 
diff --git a/libminifi/test/libtest/unit/TestUtils.h 
b/libminifi/test/libtest/unit/TestUtils.h
index d2ca10d83..64af75c7c 100644
--- a/libminifi/test/libtest/unit/TestUtils.h
+++ b/libminifi/test/libtest/unit/TestUtils.h
@@ -170,11 +170,12 @@ bool countLogOccurrencesUntil(const std::string& pattern,
                               const size_t occurrences,
                               const std::chrono::milliseconds max_duration,
                               const std::chrono::milliseconds wait_time = 
50ms);
+
 std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& 
contents, const asio::ip::tcp::endpoint& remote_endpoint, const 
std::optional<std::string_view> delimiter = std::nullopt);
-std::error_code sendUdpDatagram(const asio::const_buffer content, const 
asio::ip::udp::endpoint& remote_endpoint);
+std::expected<asio::ip::udp::endpoint /* local */, std::error_code> 
sendUdpDatagram(asio::const_buffer content, const asio::ip::udp::endpoint& 
remote_endpoint);
 
-std::error_code sendUdpDatagram(const std::span<std::byte const> content, 
const asio::ip::udp::endpoint& remote_endpoint);
-std::error_code sendUdpDatagram(const std::string_view content, const 
asio::ip::udp::endpoint& remote_endpoint);
+std::expected<asio::ip::udp::endpoint /* local */, std::error_code> 
sendUdpDatagram(std::span<std::byte const> content, const 
asio::ip::udp::endpoint& remote_endpoint);
+std::expected<asio::ip::udp::endpoint /* local */, std::error_code> 
sendUdpDatagram(std::string_view content, const asio::ip::udp::endpoint& 
remote_endpoint);
 
 bool isIPv6Disabled();
 


Reply via email to