This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a7da4380f71cfe31038966d29487a3d7f7e9833c
Author: Martin Zink <martinz...@apache.org>
AuthorDate: Wed Nov 9 13:28:00 2022 +0100

    MINIFICPP-1934 PutTCP processor
    
    Closes #1419
    Signed-off-by: Marton Szasz <sza...@apache.org>
---
 PROCESSORS.md                                      |  29 +-
 README.md                                          |   7 +-
 cmake/Asio.cmake                                   |   4 +-
 .../standard-processors/processors/PutTCP.cpp      | 581 +++++++++++++++++++++
 extensions/standard-processors/processors/PutTCP.h | 129 +++++
 .../tests/unit/ListenSyslogTests.cpp               |  11 +-
 .../tests/unit/ListenTcpTests.cpp                  |  18 +-
 .../standard-processors/tests/unit/PutTCPTests.cpp | 517 ++++++++++++++++++
 .../standard-processors/tests/unit/PutUDPTests.cpp |   2 +-
 .../tests/unit/resources/alice_by_A.pem            |  46 ++
 .../tests/unit/resources/alice_by_B.pem            |  46 ++
 .../tests/unit/resources/ca_A.crt                  |  21 +
 .../tests/unit/resources/ca_B.crt                  |  21 +
 .../tests/unit/resources/ca_cert.crt               |  20 -
 .../tests/unit/resources/cert_and_private_key.pem  |  46 --
 .../tests/unit/resources/localhost_by_A.pem        |  46 ++
 .../tests/unit/resources/localhost_by_B.pem        |  46 ++
 libminifi/include/utils/StringUtils.h              |   9 +
 libminifi/include/utils/TimeUtil.h                 |   4 +-
 libminifi/src/utils/net/SslServer.cpp              |   1 -
 libminifi/src/utils/net/TcpServer.cpp              |   1 -
 21 files changed, 1513 insertions(+), 92 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8a0aac5de..d052e5c6c 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -67,6 +67,7 @@
 - [PutSFTP](#putsftp)
 - [PutSplunkHTTP](#putsplunkhttp)
 - [PutSQL](#putsql)
+- [PutTCP](#puttcp)
 - [PutUDP](#putudp)
 - [QueryDatabaseTable](#querydatabasetable)
 - [QuerySplunkIndexingStatus](#querysplunkindexingstatus)
@@ -2167,6 +2168,32 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | After a successful SQL update operation, the incoming FlowFile 
sent here |
 
 
+## PutTCP
+
+### Description
+The PutTCP processor receives a FlowFile and transmits the FlowFile content 
over a TCP connection to the configured TCP server. By default, the FlowFiles 
are transmitted over the same TCP connection. To assist the TCP server with 
determining message boundaries, an optional "Outgoing Message Delimiter" string 
can be configured which is appended to the end of each FlowFiles content when 
it is transmitted over the TCP connection. An optional "Connection Per 
FlowFile" parameter can be specif [...]
+
+### Properties
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                           | Default Value | Allowable Values | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
   |
+|--------------------------------|---------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Hostname**                   | localhost     |                  | The ip 
address or hostname of the destination.<br/>**Supports Expression Language: 
true**                                                                          
                                                                                
                                                                                
                                                                                
|
+| **Port**                       |               |                  | The port 
or service on the destination.<br/>**Supports Expression Language: true**       
                                                                                
                                                                                
                                                                                
                                                                          |
+| **Idle Connection Expiration** | 15 seconds    |                  | The 
amount of time a connection should be held open without being used before 
closing the connection. A value of 0 seconds will disable this 
feature.<br/>**Supports Expression Language: true**                             
                                                                                
                                                                                
                      |
+| **Timeout**                    | 15 seconds    |                  | The 
timeout for connecting to and communicating with the 
destination.<br/>**Supports Expression Language: true**                         
                                                                                
                                                                                
                                                                                
                          |
+| **Connection Per FlowFile**    | false         |                  | 
Specifies whether to send each FlowFile's content on an individual connection.  
                                                                                
                                                                                
                                                                                
                                                                                
   |
+| Outgoing Message Delimiter     |               |                  | 
Specifies the delimiter to use when sending messages out over the same TCP 
stream. The delimiter is appended to each FlowFile message that is transmitted 
over the stream so that the receiver can determine when one message ends and 
the next message begins. Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.<br/>**Supports Expression 
Language: true** |
+| SSL Context Service            |               |                  | The 
Controller Service to use in order to obtain an SSL Context. If this property 
is set, messages will be sent over a secure connection.                         
                                                                                
                                                                                
                                                                                
 |
+| Max Size of Socket Send Buffer |               |                  | The 
maximum size of the socket send buffer that should be used. This is a 
suggestion to the Operating System to indicate how big the socket buffer should 
be.                                                                             
                                                                                
                                                                                
         |
+
+### Properties
+| Name    | Description                                                        
        |
+|---------|----------------------------------------------------------------------------|
+| success | FlowFiles that are sent to the destination are sent out this 
relationship. |
+| failure | FlowFiles that encountered IO errors are sent out this 
relationship.       |
+
+
 ## PutUDP
 
 ### Description
@@ -2187,7 +2214,7 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | Name    | Description                                                        
           |
 
|---------|-------------------------------------------------------------------------------|
 | success | FlowFiles that are sent to the destination are sent out this 
relationship.    |
-| failure | FlowFiles that encounter IO or network errors are send out this 
relationship. |
+| failure | FlowFiles that encounter IO or network errors are sent out this 
relationship. |
 
 ## QueryDatabaseTable
 
diff --git a/README.md b/README.md
index 0f6a5ab61..dc56009b3 100644
--- a/README.md
+++ b/README.md
@@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors:
 
 The following table lists the base set of processors.
 
-| Extension Set | Processors                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| **Base**      | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC
 [...]
+| Extension Set | Processors                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| **Base**      | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListenTCP](PROC
 [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. 
Extensions that are enabled by default ( such as CURL ), can be disabled with 
the respective CMAKE flag on the command line.
 
@@ -588,4 +588,3 @@ distributed under the License is distributed on an "AS IS" 
BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-
diff --git a/cmake/Asio.cmake b/cmake/Asio.cmake
index 1953c765c..14c8f9592 100644
--- a/cmake/Asio.cmake
+++ b/cmake/Asio.cmake
@@ -18,8 +18,8 @@
 include(FetchContent)
 
 FetchContent_Declare(asio
-        URL 
https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-22-1.tar.gz
-        URL_HASH 
SHA256=30cb54a5de5e465d10ec0c2026d6b5917f5e89fffabdbabeb1475846fc9a2cf0)
+        URL 
https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-24-0.tar.gz
+        URL_HASH 
SHA256=cbcaaba0f66722787b1a7c33afe1befb3a012b5af3ad7da7ff0f6b8c9b7a8a5b)
 
 FetchContent_GetProperties(asio)
 if(NOT asio_POPULATED)
diff --git a/extensions/standard-processors/processors/PutTCP.cpp 
b/extensions/standard-processors/processors/PutTCP.cpp
new file mode 100644
index 000000000..92b8b020b
--- /dev/null
+++ b/extensions/standard-processors/processors/PutTCP.cpp
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with 
the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+constexpr size_t chunk_size = 1024;
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, 
core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = 
context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); 
timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  ssl_context_service_.reset();
+  if (context->getProperty(SSLContextService.getName(), context_name) && 
!IsNullOrEmpty(context_name)) {
+    if (auto controller_service = context->getControllerService(context_name)) 
{
+      ssl_context_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+      if (!ssl_context_service_)
+        logger_->log_error("%s is not a SSL Context Service", context_name);
+    } else {
+      logger_->log_error("Invalid controller service: %s", context_name);
+    }
+  }
+
+  delimiter_ = 
utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const
 std::byte>());
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  if (auto max_size_of_socket_send_buffer = 
context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
+    max_size_of_socket_send_buffer_ = 
max_size_of_socket_send_buffer->getValue();
+  else
+    max_size_of_socket_send_buffer_.reset();
+}
+
+namespace {
+template<class SocketType>
+class ConnectionHandler : public ConnectionHandlerBase {
+ public:
+  ConnectionHandler(detail::ConnectionId connection_id,
+                    std::chrono::milliseconds timeout,
+                    std::shared_ptr<core::logging::Logger> logger,
+                    std::optional<size_t> max_size_of_socket_send_buffer,
+                    std::shared_ptr<controllers::SSLContextService> 
ssl_context_service)
+      : connection_id_(std::move(connection_id)),
+        timeout_(timeout),
+        logger_(std::move(logger)),
+        max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
+        ssl_context_service_(std::move(ssl_context_service)) {
+  }
+
+  ~ConnectionHandler() override = default;
+
+  nonstd::expected<void, std::error_code> sendData(const 
std::shared_ptr<io::InputStream>& flow_file_content_stream, const 
std::vector<std::byte>& delimiter) override;
+
+ private:
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
+
+  [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const 
override {
+    return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() - 
dur);
+  }
+
+  void reset() override {
+    last_used_.reset();
+    socket_.reset();
+    io_context_.reset();
+    last_error_.clear();
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+  }
+
+  void checkDeadline(std::error_code error_code, SocketType* socket);
+  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const 
std::shared_ptr<SocketType>& socket);
+
+  void handleConnect(std::error_code error,
+                     tcp::resolver::results_type::iterator endpoint_iter,
+                     const std::shared_ptr<SocketType>& socket);
+  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& 
endpoint_iter,
+                               const std::shared_ptr<SocketType>& socket);
+  void handleHandshake(std::error_code error,
+                       const tcp::resolver::results_type::iterator& 
endpoint_iter,
+                       const std::shared_ptr<SocketType>& socket);
+
+  void handleWrite(std::error_code error,
+                   std::size_t bytes_written,
+                   const std::shared_ptr<io::InputStream>& 
flow_file_content_stream,
+                   const std::vector<std::byte>& delimiter,
+                   const std::shared_ptr<SocketType>& socket);
+
+  void handleDelimiterWrite(std::error_code error, std::size_t bytes_written, 
const std::shared_ptr<SocketType>& socket);
+
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> 
establishConnection(const tcp::resolver::results_type& resolved_query);
+
+  [[nodiscard]] bool hasBeenUsed() const override { return 
last_used_.has_value(); }
+
+  detail::ConnectionId connection_id_;
+  std::optional<std::chrono::steady_clock::time_point> last_used_;
+  asio::io_context io_context_;
+  std::error_code last_error_;
+  asio::steady_timer deadline_{io_context_};
+  std::chrono::milliseconds timeout_;
+  std::shared_ptr<SocketType> socket_;
+
+  std::shared_ptr<core::logging::Logger> logger_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+  nonstd::expected<tcp::resolver::results_type, std::error_code> 
resolveHostname();
+  nonstd::expected<void, std::error_code> sendDataToSocket(const 
std::shared_ptr<SocketType>& socket,
+                                                           const 
std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                                                           const 
std::vector<std::byte>& delimiter);
+};
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> 
ConnectionHandler<SocketType>::sendData(const std::shared_ptr<io::InputStream>& 
flow_file_content_stream, const std::vector<std::byte>& delimiter) {
+  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& 
socket) { return sendDataToSocket(socket, flow_file_content_stream, delimiter); 
});;
+}
+
+template<class SocketType>
+nonstd::expected<std::shared_ptr<SocketType>, std::error_code> 
ConnectionHandler<SocketType>::getSocket() {
+  if (socket_ && socket_->lowest_layer().is_open())
+    return socket_;
+  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& 
resolved_query) { return establishConnection(resolved_query); });
+  if (!new_socket)
+    return nonstd::make_unexpected(new_socket.error());
+  socket_ = std::move(*new_socket);
+  return socket_;
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::checkDeadline(std::error_code error_code, 
SocketType* socket) {
+  if (error_code != asio::error::operation_aborted) {
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+    last_error_ = asio::error::timed_out;
+    deadline_.async_wait([&](std::error_code error_code) { 
checkDeadline(error_code, socket); });
+    socket->lowest_layer().close();
+  }
+}
+
+template<class SocketType>
+void 
ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator
 endpoint_iter, const std::shared_ptr<SocketType>& socket) {
+  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
+    logger_->log_trace("No more endpoints to try");
+    deadline_.cancel();
+    return;
+  }
+
+  last_error_.clear();
+  deadline_.expires_after(timeout_);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
+      [&socket, endpoint_iter, this](std::error_code err) {
+        handleConnect(err, endpoint_iter, socket);
+      });
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleConnect(std::error_code error,
+                                                  
tcp::resolver::results_type::iterator endpoint_iter,
+                                                  const 
std::shared_ptr<SocketType>& socket) {
+  bool connection_failed_before_deadline = error.operator bool();
+  bool connection_failed_due_to_deadline = !socket->lowest_layer().is_open();
+
+  if (connection_failed_due_to_deadline) {
+    core::logging::LOG_TRACE(logger_) << "Connecting to " << 
endpoint_iter->endpoint() << " timed out";
+    socket->lowest_layer().close();
+    return startConnect(++endpoint_iter, socket);
+  }
+
+  if (connection_failed_before_deadline) {
+    core::logging::LOG_TRACE(logger_) << "Connecting to " << 
endpoint_iter->endpoint() << " failed due to " << error.message();
+    last_error_ = error;
+    socket->lowest_layer().close();
+    return startConnect(++endpoint_iter, socket);
+  }
+
+  if (max_size_of_socket_send_buffer_)
+    
socket->lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_));
+
+  handleConnectionSuccess(endpoint_iter, socket);
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleHandshake(std::error_code,
+                                                    const 
tcp::resolver::results_type::iterator&,
+                                                    const 
std::shared_ptr<SocketType>&) {
+  throw std::invalid_argument("Handshake called without SSL");
+}
+
+template<>
+void ConnectionHandler<SslSocket>::handleHandshake(std::error_code error,
+                                                   const 
tcp::resolver::results_type::iterator& endpoint_iter,
+                                                   const 
std::shared_ptr<SslSocket>& socket) {
+  if (!error) {
+    core::logging::LOG_TRACE(logger_) << "Successful handshake with " << 
endpoint_iter->endpoint();
+    deadline_.cancel();
+    return;
+  }
+  core::logging::LOG_TRACE(logger_) << "Handshake with " << 
endpoint_iter->endpoint() << " failed due to " << error.message();
+  last_error_ = error;
+  socket->lowest_layer().close();
+  startConnect(std::next(endpoint_iter), socket);
+}
+
+template<>
+void ConnectionHandler<TcpSocket>::handleConnectionSuccess(const 
tcp::resolver::results_type::iterator& endpoint_iter,
+                                                           const 
std::shared_ptr<TcpSocket>& socket) {
+  core::logging::LOG_TRACE(logger_) << "Connected to " << 
endpoint_iter->endpoint();
+  socket->lowest_layer().non_blocking(true);
+  deadline_.cancel();
+}
+
+template<>
+void ConnectionHandler<SslSocket>::handleConnectionSuccess(const 
tcp::resolver::results_type::iterator& endpoint_iter,
+                                                           const 
std::shared_ptr<SslSocket>& socket) {
+  core::logging::LOG_TRACE(logger_) << "Connected to " << 
endpoint_iter->endpoint();
+  socket->async_handshake(asio::ssl::stream_base::client, [this, &socket, 
endpoint_iter](const std::error_code handshake_error) {
+    handleHandshake(handshake_error, endpoint_iter, socket);
+  });
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleWrite(std::error_code error,
+                                                std::size_t bytes_written,
+                                                const 
std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                                                const std::vector<std::byte>& 
delimiter,
+                                                const 
std::shared_ptr<SocketType>& socket) {
+  bool write_failed_before_deadline = error.operator bool();
+  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
+
+  if (write_failed_due_to_deadline) {
+    logger_->log_trace("Writing flowfile to socket timed out");
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  if (write_failed_before_deadline) {
+    last_error_ = error;
+    logger_->log_trace("Writing flowfile to socket failed due to %s", 
error.message());
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", 
bytes_written);
+  if (flow_file_content_stream->size() == flow_file_content_stream->tell()) {
+    asio::async_write(*socket, asio::buffer(delimiter), [&](std::error_code 
error, std::size_t bytes_written) {
+      handleDelimiterWrite(error, bytes_written, socket);
+    });
+  } else {
+    std::vector<std::byte> data_chunk;
+    data_chunk.resize(chunk_size);
+    gsl::span<std::byte> buffer{data_chunk};
+    size_t num_read = flow_file_content_stream->read(buffer);
+    asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const 
std::error_code err, std::size_t bytes_written) {
+      handleWrite(err, bytes_written, flow_file_content_stream, delimiter, 
socket);
+    });
+  }
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleDelimiterWrite(std::error_code 
error, std::size_t bytes_written, const std::shared_ptr<SocketType>& socket) {
+  bool write_failed_before_deadline = error.operator bool();
+  bool write_failed_due_to_deadline = !socket->lowest_layer().is_open();
+
+  if (write_failed_due_to_deadline) {
+    logger_->log_trace("Writing delimiter to socket timed out");
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  if (write_failed_before_deadline) {
+    last_error_ = error;
+    logger_->log_trace("Writing delimiter to socket failed due to %s", 
error.message());
+    socket->lowest_layer().close();
+    deadline_.cancel();
+    return;
+  }
+
+  logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", 
bytes_written);
+  deadline_.cancel();
+}
+
+
+template<>
+nonstd::expected<std::shared_ptr<TcpSocket>, std::error_code> 
ConnectionHandler<TcpSocket>::establishConnection(const 
tcp::resolver::results_type& resolved_query) {
+  auto socket = std::make_shared<TcpSocket>(io_context_);
+  startConnect(resolved_query.begin(), socket);
+  deadline_.expires_after(timeout_);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.run();
+  if (last_error_)
+    return nonstd::make_unexpected(last_error_);
+  return socket;
+}
+
+asio::ssl::context getSslContext(const auto& ssl_context_service) {
+  gsl_Expects(ssl_context_service);
+  asio::ssl::context ssl_context(asio::ssl::context::sslv23);
+  ssl_context.load_verify_file(ssl_context_service->getCACertificate());
+  ssl_context.set_verify_mode(asio::ssl::verify_peer);
+  if (auto cert_file = ssl_context_service->getCertificateFile(); 
!cert_file.empty())
+    ssl_context.use_certificate_file(cert_file, asio::ssl::context::pem);
+  if (auto private_key_file = ssl_context_service->getPrivateKeyFile(); 
!private_key_file.empty())
+    ssl_context.use_private_key_file(private_key_file, 
asio::ssl::context::pem);
+  ssl_context.set_password_callback([password = 
ssl_context_service->getPassphrase()](std::size_t&, 
asio::ssl::context_base::password_purpose&) { return password; });
+  return ssl_context;
+}
+
+template<>
+nonstd::expected<std::shared_ptr<SslSocket>, std::error_code> 
ConnectionHandler<SslSocket>::establishConnection(const 
tcp::resolver::results_type& resolved_query) {
+  auto ssl_context = getSslContext(ssl_context_service_);
+  auto socket = std::make_shared<SslSocket>(io_context_, ssl_context);
+  startConnect(resolved_query.begin(), socket);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.run();
+  if (last_error_)
+    return nonstd::make_unexpected(last_error_);
+  return socket;
+}
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> 
ConnectionHandler<SocketType>::sendDataToSocket(const 
std::shared_ptr<SocketType>& socket,
+                                                                               
         const std::shared_ptr<io::InputStream>& flow_file_content_stream,
+                                                                               
         const std::vector<std::byte>& delimiter) {
+  if (!socket || !socket->lowest_layer().is_open())
+    return nonstd::make_unexpected(asio::error::not_socket);
+
+  deadline_.expires_after(timeout_);
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.restart();
+
+  std::vector<std::byte> data_chunk;
+  data_chunk.resize(chunk_size);
+
+  gsl::span<std::byte> buffer{data_chunk};
+  size_t num_read = flow_file_content_stream->read(buffer);
+  logger_->log_trace("read %zu bytes from flowfile", num_read);
+  asio::async_write(*socket, asio::buffer(data_chunk, num_read), [&](const 
std::error_code err, std::size_t bytes_written) {
+    handleWrite(err, bytes_written, flow_file_content_stream, delimiter, 
socket);
+  });
+  deadline_.async_wait([&](std::error_code error_code) -> void {
+    checkDeadline(error_code, socket.get());
+  });
+  io_context_.run();
+  if (last_error_)
+    return nonstd::make_unexpected(last_error_);
+  last_used_ = std::chrono::steady_clock::now();
+  return {};
+}
+
+template<class SocketType>
+nonstd::expected<tcp::resolver::results_type, std::error_code> 
ConnectionHandler<SocketType>::resolveHostname() {
+  tcp::resolver resolver(io_context_);
+  std::error_code error_code;
+  auto resolved_query = resolver.resolve(connection_id_.getHostname(), 
connection_id_.getPort(), error_code);
+  if (error_code)
+    return nonstd::make_unexpected(error_code);
+  return resolved_query;
+}
+}  // namespace
+
+void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* 
const session) {
+  gsl_Expects(context && session);
+
+  const auto flow_file = session->get();
+  if (!flow_file) {
+    yield();
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context->getProperty(Hostname, 
flow_file).value_or(std::string{});
+  auto port = context->getProperty(Port, flow_file).value_or(std::string{});
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[%s] invalid target endpoint: hostname: %s, port: %s", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  auto flow_file_content_stream = session->getFlowFileContentStream(flow_file);
+  if (!flow_file_content_stream) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = detail::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_service_)
+      handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, 
timeout_, logger_, max_size_of_socket_send_buffer_, ssl_context_service_);
+    else
+      handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, 
timeout_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, flow_file_content_stream, *session, flow_file);
+}
+
+void PutTCP::removeExpiredConnections() {
+  if (connections_) {
+    std::erase_if(*connections_, [this](auto& item) -> bool {
+      const auto& connection_handler = item.second;
+      return (!connection_handler || (idle_connection_expiration_ && 
!connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
+    });
+  }
+}
+
+void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& 
connection_handler,
+                             const std::shared_ptr<io::InputStream>& 
flow_file_content_stream,
+                             core::ProcessSession& session,
+                             const std::shared_ptr<core::FlowFile>& flow_file) 
{
+  auto result = connection_handler->sendData(flow_file_content_stream, 
delimiter_);
+
+  if (!result && connection_handler->hasBeenUsed()) {
+    logger_->log_warn("%s with reused connection, retrying...", 
result.error().message());
+    connection_handler->reset();
+    result = connection_handler->sendData(flow_file_content_stream, 
delimiter_);
+  }
+
+  const auto transfer_to_success = [&session, &flow_file]() -> void {
+    session.transfer(flow_file, Success);
+  };
+
+  const auto transfer_to_failure = [&session, &flow_file, &logger = logger_, 
&connection_handler](std::error_code ec) -> void {
+    gsl_Expects(ec);
+    connection_handler->reset();
+    logger->log_error("%s", ec.message());
+    session.transfer(flow_file, Failure);
+  };
+
+  result | utils::map(transfer_to_success) | 
utils::orElse(transfer_to_failure);
+}
+
+REGISTER_RESOURCE(PutTCP, Processor);
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/PutTCP.h 
b/extensions/standard-processors/processors/PutTCP.h
new file mode 100644
index 000000000..4b8999e0f
--- /dev/null
+++ b/extensions/standard-processors/processors/PutTCP.h
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+
+#include "io/InputStream.h"
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors::detail {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : 
hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  auto operator<=>(const ConnectionId&) const = default;
+
+  [[nodiscard]] std::string_view getHostname() const { return hostname_; }
+  [[nodiscard]] std::string_view getPort() const { return port_; }
+
+ private:
+  std::string hostname_;
+  std::string port_;
+};
+}  // namespace org::apache::nifi::minifi::processors::detail
+
+namespace std {
+template <>
+struct hash<org::apache::nifi::minifi::processors::detail::ConnectionId> {
+  size_t operator()(const 
org::apache::nifi::minifi::processors::detail::ConnectionId& connection_id) 
const {
+    return org::apache::nifi::minifi::utils::hash_combine(
+        std::hash<std::string_view>{}(connection_id.getHostname()),
+        std::hash <std::string_view>{}(connection_id.getPort()));
+  }
+};
+}  // namespace std
+
+namespace org::apache::nifi::minifi::processors {
+class ConnectionHandlerBase {
+ public:
+  virtual ~ConnectionHandlerBase() = default;
+
+  [[nodiscard]] virtual bool hasBeenUsed() const = 0;
+  [[nodiscard]] virtual bool hasBeenUsedIn(std::chrono::milliseconds dur) 
const = 0;
+  virtual nonstd::expected<void, std::error_code> sendData(const 
std::shared_ptr<io::InputStream>& flow_file_content_stream, const 
std::vector<std::byte>& delimiter) = 0;
+  virtual void reset() = 0;
+};
+
+class PutTCP final : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "The PutTCP 
processor receives a FlowFile and transmits the FlowFile content over a TCP 
connection to the configured TCP server. "
+      "By default, the FlowFiles are transmitted over the same TCP connection. 
To assist the TCP server with determining message boundaries, "
+      "an optional \"Outgoing Message Delimiter\" string can be configured 
which is appended to the end of each FlowFiles content when it is transmitted 
over the TCP connection. "
+      "An optional \"Connection Per FlowFile\" parameter can be specified to 
change the behaviour so that each FlowFiles content is transmitted over a 
single TCP connection "
+      "which is closed after the FlowFile has been sent.";
+  EXTENSIONAPI static const core::Property Hostname;
+  EXTENSIONAPI static const core::Property Port;
+  EXTENSIONAPI static const core::Property IdleConnectionExpiration;
+  EXTENSIONAPI static const core::Property Timeout;
+  EXTENSIONAPI static const core::Property ConnectionPerFlowFile;
+  EXTENSIONAPI static const core::Property OutgoingMessageDelimiter;
+  EXTENSIONAPI static const core::Property SSLContextService;
+  EXTENSIONAPI static const core::Property MaxSizeOfSocketSendBuffer;
+
+  static auto properties() { return std::array{Hostname, Port, 
IdleConnectionExpiration, Timeout, ConnectionPerFlowFile, 
OutgoingMessageDelimiter, SSLContextService, MaxSizeOfSocketSendBuffer}; }
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+  static auto relationships() { return std::array{Success, Failure}; }
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  explicit PutTCP(const std::string& name, const utils::Identifier& uuid = {});
+  PutTCP(const PutTCP&) = delete;
+  PutTCP& operator=(const PutTCP&) = delete;
+  ~PutTCP() final;
+
+  void initialize() final;
+  void notifyStop() final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
+
+ private:
+  void removeExpiredConnections();
+  void processFlowFile(std::shared_ptr<ConnectionHandlerBase>& 
connection_handler,
+                       const std::shared_ptr<io::InputStream>& 
flow_file_content_stream,
+                       core::ProcessSession& session,
+                       const std::shared_ptr<core::FlowFile>& flow_file);
+
+  std::vector<std::byte> delimiter_;
+  std::optional<std::unordered_map<detail::ConnectionId, 
std::shared_ptr<ConnectionHandlerBase>>> connections_;
+  std::optional<std::chrono::milliseconds> idle_connection_expiration_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+  std::chrono::milliseconds timeout_ = std::chrono::seconds(15);
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<PutTCP>::getLogger();
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp 
b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index 302af6c3a..1de8165c0 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -495,11 +495,11 @@ TEST_CASE("Test ListenSyslog via TCP with SSL 
connection", "[ListenSyslog]") {
   auto ssl_context_service = 
controller.plan->addController("SSLContextService", "SSLContextService");
   const auto executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, 
controllers::SSLContextService::CACertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, 
"resources/ca_cert.crt")));
+      minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, 
controllers::SSLContextService::ClientCertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, 
"resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, 
"resources/localhost_by_A.pem")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, 
controllers::SSLContextService::PrivateKey.getName(),
-    minifi::utils::file::concat_path(executable_dir, 
"resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, 
"resources/localhost_by_A.pem")));
   LogTestController::getInstance().setTrace<ListenSyslog>();
   REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, 
std::to_string(SYSLOG_PORT)));
   REQUIRE(listen_syslog->setProperty(ListenSyslog::MaxBatchSize, "2"));
@@ -508,8 +508,9 @@ TEST_CASE("Test ListenSyslog via TCP with SSL connection", 
"[ListenSyslog]") {
   REQUIRE(listen_syslog->setProperty(ListenSyslog::SSLContextService, 
"SSLContextService"));
   ssl_context_service->enable();
   controller.plan->scheduleProcessor(listen_syslog);
-  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
-  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
+  REQUIRE(utils::sendMessagesViaSSL({rfc5424_logger_example_1}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
+  REQUIRE(utils::sendMessagesViaSSL({invalid_syslog}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "/resources/ca_A.crt")));
+
   std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 
50ms));
   CHECK(controller.plan->getContent(result.at(ListenSyslog::Success)[0]) == 
rfc5424_logger_example_1);
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp 
b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index 247f2e6ff..e8c1f9286 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -119,11 +119,11 @@ TEST_CASE("Test ListenTCP with SSL connection", 
"[ListenTCP]") {
   LogTestController::getInstance().setTrace<ListenTCP>();
   const auto executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
   REQUIRE(controller.plan->setProperty(ssl_context_service, 
controllers::SSLContextService::CACertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, 
"resources/ca_cert.crt")));
+      minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, 
controllers::SSLContextService::ClientCertificate.getName(),
-    minifi::utils::file::concat_path(executable_dir, 
"resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, 
"resources/localhost_by_A.pem")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, 
controllers::SSLContextService::PrivateKey.getName(),
-    minifi::utils::file::concat_path(executable_dir, 
"resources/cert_and_private_key.pem")));
+      minifi::utils::file::concat_path(executable_dir, 
"resources/localhost_by_A.pem")));
   REQUIRE(controller.plan->setProperty(ssl_context_service, 
controllers::SSLContextService::Passphrase.getName(), "Password12"));
   REQUIRE(controller.plan->setProperty(listen_tcp, ListenTCP::Port.getName(), 
std::to_string(PORT)));
   REQUIRE(controller.plan->setProperty(listen_tcp, 
ListenTCP::MaxBatchSize.getName(), "2"));
@@ -159,7 +159,7 @@ TEST_CASE("Test ListenTCP with SSL connection", 
"[ListenTCP]") {
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message: expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "resources/ca_cert.crt")));
+      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "resources/ca_A.crt")));
     }
   }
 
@@ -190,14 +190,14 @@ TEST_CASE("Test ListenTCP with SSL connection", 
"[ListenTCP]") {
     controller.plan->scheduleProcessor(listen_tcp);
 
     minifi::utils::net::SslData ssl_data;
-    ssl_data.ca_loc = minifi::utils::file::FileUtils::get_executable_dir() + 
"/resources/ca_cert.crt";
-    ssl_data.cert_loc = minifi::utils::file::FileUtils::get_executable_dir() + 
"/resources/cert_and_private_key.pem";
-    ssl_data.key_loc = minifi::utils::file::FileUtils::get_executable_dir() + 
"/resources/cert_and_private_key.pem";
+    ssl_data.ca_loc = minifi::utils::file::FileUtils::get_executable_dir() + 
"/resources/ca_A.crt";
+    ssl_data.cert_loc = minifi::utils::file::FileUtils::get_executable_dir() + 
"/resources/localhost_by_A.pem";
+    ssl_data.key_loc = minifi::utils::file::FileUtils::get_executable_dir() + 
"/resources/localhost_by_A.pem";
     ssl_data.key_pw = "Password12";
 
     expected_successful_messages = {"test_message_1", "another_message"};
     for (const auto& message : expected_successful_messages) {
-      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, 
minifi::utils::file::FileUtils::get_executable_dir() + 
"/resources/ca_cert.crt", ssl_data));
+      REQUIRE(utils::sendMessagesViaSSL({message}, endpoint, 
minifi::utils::file::FileUtils::get_executable_dir() + "/resources/ca_A.crt", 
ssl_data));
     }
   }
 
@@ -214,7 +214,7 @@ TEST_CASE("Test ListenTCP with SSL connection", 
"[ListenTCP]") {
     ssl_context_service->enable();
     controller.plan->scheduleProcessor(listen_tcp);
 
-    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "/resources/ca_cert.crt")));
+    REQUIRE_FALSE(utils::sendMessagesViaSSL({"test_message_1"}, endpoint, 
minifi::utils::file::concat_path(executable_dir, "/resources/ca_A.crt")));
   }
 
   ProcessorTriggerResult result;
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp 
b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
new file mode 100644
index 000000000..6398d2bc0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -0,0 +1,517 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SessionType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SessionType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareSslServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources" / "ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / "resources" / 
"localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / "resources" / 
"localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+    LogTestController::getInstance().setTrace<PutTCP>();
+    LogTestController::getInstance().setInfo<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<utils::net::Server>();
+    put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+    put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+    put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+    stopServers();
+  }
+
+  void stopServers() {
+    for (auto& [port, server] : listeners_) {
+      auto& listener = server.listener_;
+      auto& server_thread = server.server_thread_;
+      if (listener)
+        listener->stop();
+      if (server_thread.joinable())
+        server_thread.join();
+      listener.reset();
+    }
+  }
+
+  size_t getNumberOfActiveSessions(std::optional<uint16_t> port = 
std::nullopt) {
+    if (auto session_aware_listener = 
dynamic_cast<ISessionAwareServer*>(getListener(port))) {
+      return session_aware_listener->getNumberOfSessions() - 1;  // There is 
always one inactive session waiting for a new connection
+    }
+    return -1;
+  }
+
+  void closeActiveConnections() {
+    for (auto& [port, server] : listeners_) {
+      if (auto session_aware_listener = 
dynamic_cast<ISessionAwareServer*>(server.listener_.get())) {
+        session_aware_listener->closeSessions();
+      }
+    }
+    std::this_thread::sleep_for(200ms);
+  }
+
+  auto trigger(std::string_view message, std::unordered_map<std::string, 
std::string> input_flow_file_attributes = {}) {
+    return controller_.trigger(message, std::move(input_flow_file_attributes));
+  }
+
+  auto getContent(const auto& flow_file) {
+    return controller_.plan->getContent(flow_file);
+  }
+
+  std::optional<utils::net::Message> 
tryDequeueReceivedMessage(std::optional<uint16_t> port = std::nullopt) {
+    auto timeout = 200ms;
+    auto interval = 10ms;
+
+    auto start_time = std::chrono::system_clock::now();
+    utils::net::Message result;
+    while (start_time + timeout > std::chrono::system_clock::now()) {
+      if (getListener(port)->tryDequeue(result))
+        return result;
+      std::this_thread::sleep_for(interval);
+    }
+    return std::nullopt;
+  }
+
+  void addSSLContextToPutTCP(const std::filesystem::path& ca_cert, const 
std::optional<std::filesystem::path>& client_cert_key) {
+    const std::filesystem::path ca_dir = 
std::filesystem::path(minifi::utils::file::FileUtils::get_executable_dir()) / 
"resources";
+    auto ssl_context_service_node = 
controller_.plan->addController("SSLContextService", "SSLContextService");
+    REQUIRE(controller_.plan->setProperty(ssl_context_service_node, 
SSLContextService::CACertificate.getName(), (ca_dir / ca_cert).string()));
+    if (client_cert_key) {
+      REQUIRE(controller_.plan->setProperty(ssl_context_service_node, 
SSLContextService::ClientCertificate.getName(), (ca_dir / 
*client_cert_key).string()));
+      REQUIRE(controller_.plan->setProperty(ssl_context_service_node, 
SSLContextService::PrivateKey.getName(), (ca_dir / *client_cert_key).string()));
+    }
+    ssl_context_service_node->enable();
+
+    put_tcp_->setProperty(PutTCP::SSLContextService, "SSLContextService");
+  }
+
+  void setHostname(const std::string& hostname) {
+    REQUIRE(controller_.plan->setProperty(put_tcp_, 
PutTCP::Hostname.getName(), hostname));
+  }
+
+  void enableConnectionPerFlowFile() {
+    REQUIRE(controller_.plan->setProperty(put_tcp_, 
PutTCP::ConnectionPerFlowFile.getName(), "true"));
+  }
+
+  void setIdleConnectionExpiration(const std::string& 
idle_connection_expiration_str) {
+    REQUIRE(controller_.plan->setProperty(put_tcp_, 
PutTCP::IdleConnectionExpiration.getName(), idle_connection_expiration_str));
+  }
+
+  uint16_t addTCPServer() {
+    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 
1}(random_engine_);
+    listeners_[port].startTCPServer(port);
+    return port;
+  }
+
+  uint16_t addSSLServer() {
+    uint16_t port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 
1}(random_engine_);
+    listeners_[port].startSSLServer(port);
+    return port;
+  }
+
+  void setPutTCPPort(uint16_t port) {
+    put_tcp_->setProperty(PutTCP::Port, 
utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
+  }
+
+  void setPutTCPPort(std::string port_str) {
+    put_tcp_->setProperty(PutTCP::Port, std::move(port_str));
+  }
+
+  [[nodiscard]] uint16_t getSinglePort() const {
+    gsl_Expects(listeners_.size() == 1);
+    return listeners_.begin()->first;
+  }
+
+ private:
+  utils::net::Server* getListener(std::optional<uint16_t> port) {
+    if (!port)
+      port = getSinglePort();
+    return listeners_.at(*port).listener_.get();
+  }
+
+  const std::shared_ptr<PutTCP> put_tcp_ = std::make_shared<PutTCP>("PutTCP");
+  test::SingleProcessorTestController controller_{put_tcp_};
+
+  std::mt19937 random_engine_{std::random_device{}()};  // NOLINT: "Missing 
space before {  [whitespace/braces] [5]"
+  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding 
to those
+
+  class Server {
+   public:
+    Server() = default;
+
+    void startTCPServer(uint16_t port) {
+      gsl_Expects(!listener_ && !server_thread_.joinable());
+      listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port, 
core::logging::LoggerFactory<utils::net::Server>::getLogger());
+      server_thread_ = std::thread([this]() { listener_->run(); });
+    }
+
+    void startSSLServer(uint16_t port) {
+      gsl_Expects(!listener_ && !server_thread_.joinable());
+      listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
+                                                          port,
+                                                          
core::logging::LoggerFactory<utils::net::Server>::getLogger(),
+                                                          
createSslDataForServer(),
+                                                          
utils::net::SslServer::ClientAuthOption::REQUIRED);
+      server_thread_ = std::thread([this]() { listener_->run(); });
+    }
+
+    std::unique_ptr<utils::net::Server> listener_;
+    std::thread server_thread_;
+  };
+  std::unordered_map<uint16_t, Server> listeners_;
+};
+
+void trigger_expect_success(PutTCPTestFixture& test_fixture, const 
std::string_view message, std::unordered_map<std::string, std::string> 
input_flow_file_attributes = {}) {
+  const auto result = test_fixture.trigger(message, 
std::move(input_flow_file_attributes));
+  const auto& success_flow_files = result.at(PutTCP::Success);
+  CHECK(success_flow_files.size() == 1);
+  CHECK(result.at(PutTCP::Failure).empty());
+  if (!success_flow_files.empty())
+    CHECK(test_fixture.getContent(success_flow_files[0]) == message);
+}
+
+void trigger_expect_failure(PutTCPTestFixture& test_fixture, const 
std::string_view message) {
+  const auto result = test_fixture.trigger(message);
+  const auto &failure_flow_files = result.at(PutTCP::Failure);
+  CHECK(failure_flow_files.size() == 1);
+  CHECK(result.at(PutTCP::Success).empty());
+  if (!failure_flow_files.empty())
+    CHECK(test_fixture.getContent(failure_flow_files[0]) == message);
+}
+
+void receive_success(PutTCPTestFixture& test_fixture, const std::string_view 
expected_message, std::optional<uint16_t> port = std::nullopt) {
+  auto received_message = test_fixture.tryDequeueReceivedMessage(port);
+  CHECK(received_message);
+  if (received_message) {
+    CHECK(received_message->message_data == expected_message);
+    CHECK(received_message->protocol == utils::net::IpProtocol::TCP);
+    CHECK(!received_message->sender_address.to_string().empty());
+  }
+}
+
+constexpr std::string_view first_message = "message 1";
+constexpr std::string_view second_message = "message 22";
+constexpr std::string_view third_message = "message 333";
+constexpr std::string_view fourth_message = "message 4444";
+constexpr std::string_view fifth_message = "message 55555";
+constexpr std::string_view sixth_message = "message 666666";
+
+TEST_CASE("Server closes in-use socket", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+  }
+
+  trigger_expect_success(test_fixture, first_message);
+  trigger_expect_success(test_fixture, second_message);
+  trigger_expect_success(test_fixture, third_message);
+
+  receive_success(test_fixture, first_message);
+  receive_success(test_fixture, second_message);
+  receive_success(test_fixture, third_message);
+
+  CHECK(1 == test_fixture.getNumberOfActiveSessions());
+
+  test_fixture.closeActiveConnections();
+
+  trigger_expect_success(test_fixture, fourth_message);
+  trigger_expect_success(test_fixture, fifth_message);
+  trigger_expect_success(test_fixture, sixth_message);
+
+  test_fixture.tryDequeueReceivedMessage();
+
+  CHECK(LogTestController::getInstance().matchesRegex("warning.*with reused 
connection, retrying"));
+  CHECK(2 == test_fixture.getNumberOfActiveSessions());
+}
+
+TEST_CASE("Connection per flow file", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+  }
+
+  test_fixture.enableConnectionPerFlowFile();
+
+  trigger_expect_success(test_fixture, first_message);
+  trigger_expect_success(test_fixture, second_message);
+  trigger_expect_success(test_fixture, third_message);
+
+  receive_success(test_fixture, first_message);
+  receive_success(test_fixture, second_message);
+  receive_success(test_fixture, third_message);
+
+  trigger_expect_success(test_fixture, fourth_message);
+  trigger_expect_success(test_fixture, fifth_message);
+  trigger_expect_success(test_fixture, sixth_message);
+
+  receive_success(test_fixture, fourth_message);
+  receive_success(test_fixture, fifth_message);
+  receive_success(test_fixture, sixth_message);
+
+  CHECK(6 == test_fixture.getNumberOfActiveSessions());
+}
+
+TEST_CASE("PutTCP test invalid host", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+
+  test_fixture.setPutTCPPort(1235);
+  test_fixture.setHostname("invalid_hostname");
+  trigger_expect_failure(test_fixture, "message for invalid host");
+
+  CHECK((LogTestController::getInstance().contains("Host not found", 0ms)
+      || LogTestController::getInstance().contains("No such host is known", 
0ms)));
+}
+
+TEST_CASE("PutTCP test invalid server", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+  test_fixture.setPutTCPPort(1235);
+  test_fixture.setHostname("localhost");
+  trigger_expect_failure(test_fixture, "message for invalid server");
+
+  CHECK((LogTestController::getInstance().contains("Connection refused", 0ms)
+      || LogTestController::getInstance().contains("No connection could be 
made because the target machine actively refused it", 0ms)
+      || LogTestController::getInstance().contains("A connection attempt 
failed because the connected party did not properly respond", 0ms)));
+}
+
+TEST_CASE("PutTCP test non-routable server", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+  test_fixture.setHostname("192.168.255.255");
+  test_fixture.setPutTCPPort(1235);
+  trigger_expect_failure(test_fixture, "message for non-routable server");
+
+  CHECK((LogTestController::getInstance().contains("Connection timed out", 0ms)
+    || LogTestController::getInstance().contains("Operation timed out", 0ms)
+    || LogTestController::getInstance().contains("host has failed to respond", 
0ms)));
+}
+
+TEST_CASE("PutTCP test invalid server cert", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+
+  test_fixture.addSSLContextToPutTCP("ca_B.crt", "alice_by_B.pem");
+  test_fixture.setHostname("localhost");
+  auto port = test_fixture.addSSLServer();
+  test_fixture.setPutTCPPort(port);
+
+  trigger_expect_failure(test_fixture, "message for invalid-cert server");
+
+  CHECK((LogTestController::getInstance().contains("certificate verify 
failed", 0ms)
+      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));
+}
+
+TEST_CASE("PutTCP test missing client cert", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+
+  test_fixture.addSSLContextToPutTCP("ca_A.crt", std::nullopt);
+  test_fixture.setHostname("localhost");
+  auto port = test_fixture.addSSLServer();
+  test_fixture.setPutTCPPort(port);
+
+  trigger_expect_failure(test_fixture, "message for invalid-cert server");
+
+  CHECK((LogTestController::getInstance().contains("sslv3 alert handshake 
failure", 0ms)
+      || LogTestController::getInstance().contains("asio.ssl error", 0ms)));}
+
+TEST_CASE("PutTCP test idle connection expiration", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+  }
+
+  test_fixture.setIdleConnectionExpiration("100ms");
+  trigger_expect_success(test_fixture, first_message);
+  std::this_thread::sleep_for(110ms);
+  trigger_expect_success(test_fixture, second_message);
+
+  receive_success(test_fixture, first_message);
+  receive_success(test_fixture, second_message);
+
+  CHECK(2 == test_fixture.getNumberOfActiveSessions());
+}
+
+TEST_CASE("PutTCP test long flow file chunked sending", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  SECTION("No SSL") {
+    auto port = test_fixture.addTCPServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    auto port = test_fixture.addSSLServer();
+    test_fixture.setPutTCPPort(port);
+  }
+  std::string long_message(3500, 'a');
+  trigger_expect_success(test_fixture, long_message);
+  receive_success(test_fixture, long_message);
+}
+
+TEST_CASE("PutTCP test multiple servers", "[PutTCP]") {
+  PutTCPTestFixture test_fixture;
+  size_t number_of_servers = 5;
+  std::vector<uint16_t> ports;
+  SECTION("No SSL") {
+    for (size_t i = 0; i < number_of_servers; ++i) {
+      ports.push_back(test_fixture.addTCPServer());
+    }
+  }
+  SECTION("SSL") {
+    test_fixture.addSSLContextToPutTCP("ca_A.crt", "alice_by_A.pem");
+    for (size_t i = 0; i < number_of_servers; ++i) {
+      ports.push_back(test_fixture.addSSLServer());
+    }
+  }
+
+  test_fixture.setPutTCPPort("${tcp_port}");
+
+  for (auto i = 0; i < 3; ++i) {
+    for (auto& port : ports) {
+      std::string message = "Test message ";
+      message.append(std::to_string(port));
+      trigger_expect_success(test_fixture, message, {{"tcp_port", 
std::to_string(port)}});
+      receive_success(test_fixture, message, port);
+    }
+  }
+  for (auto& port : ports) {
+    CHECK(1 == test_fixture.getNumberOfActiveSessions(port));
+  }
+}
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp 
b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index fd313d928..ca63f5da5 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -57,7 +57,7 @@ TEST_CASE("PutUDP", "[putudp]") {
   put_udp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
   put_udp->setProperty(PutUDP::Port, 
utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
 
-  utils::net::UdpServer listener{std::nullopt, port, 
core::logging::LoggerFactory<utils::net::UdpServer>().getLogger()};
+  utils::net::UdpServer listener{std::nullopt, port, 
core::logging::LoggerFactory<utils::net::UdpServer>::getLogger()};
 
   auto server_thread = std::thread([&listener]() { listener.run(); });
   auto cleanup_server = gsl::finally([&]{
diff --git a/extensions/standard-processors/tests/unit/resources/alice_by_A.pem 
b/extensions/standard-processors/tests/unit/resources/alice_by_A.pem
new file mode 100644
index 000000000..605fe9366
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/alice_by_A.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAqhZR+Hsx397YYy2sQOI0IkxO6rkJvftLrjRpy1YwVfArimkU
+umZkWdpE7FAt1BIkzBlSsqXzeY/+W53YjOBcLK+xrQHpIquTGG6iL4btM6hWPBow
+hxuz0TpW5SpsuupQbRi4hbWVuQzTCKV68VM01/590Su2l0MPoamSMthK8H2ubodz
+R1VFwlTaLZmRJ20hyowsuKLOdc8fyzDXH5JPR9+STHsPpl+OccDvTG8iKlOZMa8z
+d3GXWBhSfcPgP+WzJWLn1bVN7UbKfgneUYSRAvf+ocsT1OZ7T+eam07ROsZBgpN1
+VVycmFalRqsNddT814tUIgkRXEsXzC1bP/eV2QIDAQABAoIBAH7O50xHpRaQkWnY
+Gm3BeDb+B3ROgqnm2jTGFP4pgx3/Uqb90xtpzXWEGxDIcnKDGHYmhxZ0TYMbTPtH
+QrU9bNtQHjqriwJzQtbbXQXsJZr27Vwf9oA0sirSwQhYSfpNSasc3C2sBTWTDx+K
+KJAVhfdnYKx7V8WMlPHld/96bNzA0AqGgn8FGYDiPAdiY7Ega6/iEMgtwGgIe56c
+k5YeaXOeV9b5gZGFZyDXcnbAgC24gcuSI70YmoYgSBYJC1NftLEa4NLysetpEA2A
+de1kQxm54ZfeeC+whtT693jIvp8cg6Ck/yCNj+qGXFbBWjibojs/uN8PadBOl8DU
+hdGrG2UCgYEA0tCiEfaMykaZn4GTmUq+drx2l5eP6GBhmkCpw5b0AS7Xpi/sh1Hm
++v36+ffVdPsYylVDMCCLxrugx+pkCwk2I+fHxRJ0tGGOCBclpQqE0mgYQdBmGyMQ
+hsXea+9IhbbeGkqgxxHWGHzVPtq7NFOLSt6LGF6+RXhfHcaBq/ypFccCgYEAzor2
+nhQr0q6UpaVUmZp3fliVfhsv7qQEcsCXbAssLbKM36vyD8m9A6V94QLfL/Z6jBx7
+3Edh18OrN0dfIlcF3J8jiD/3mGiGGldsF/dgJWQB8DpP51tYldR7ni1bW+ZDtlX7
+XmcWKTGZLqXqzYS8bsCcKUOLd2g//p1mnbcfd18CgYA8cG4Wok3I7Ca88SREzYX/
+epaxbVVntMImvCUvmwaHlEtlLNYuEZAcI1ah9aiv6hE4aOtjT+Fi74Xv6sYV1+U6
+tAe0+06ULGfQ7/nt8C8WN5vEup+bZhkl2nKjFS4Aj+XrObwQdo+f46IrbABBxzXn
+GBheu0LnndP/MFsa2MwNHwKBgEmAFkcm8nlk+yz/at3GpGNn7rsTvbj00Uhs1PXz
+++K/OXaXX+rSZdsYV3VtajNNSUr3D/TRyjXYQePIGEjGIyXh0+k2qkuoVqClH6hf
+te1Ya4AroCe60AlxthQSHALWLJ6EdpGfqbk7F0IMdURxygS3slrU2JrDlJJtPQk/
+E4mNAoGAJjht0RIhJLr+Gss+7SocEYFd8klFpRTWpx80pN5hRdUfH84Q3OOXnq2F
+QoHD7WLMM8Ec/paSSZjrmvk00Ptp4s2/Z1SRhY2BQjbk32xP0/CkGGK8SPVW6Sb7
+hAol6soYGroGcCGsRPdRE6hF5+BH8VYGh5vPlELyDnNym7Kp0wU=
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDA8Dkntpi2PSSJDZGYjjG03qNboMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEEwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWFsaWNlLmNvbTCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKoWUfh7Md/e2GMtrEDiNCJMTuq5
+Cb37S640actWMFXwK4ppFLpmZFnaROxQLdQSJMwZUrKl83mP/lud2IzgXCyvsa0B
+6SKrkxhuoi+G7TOoVjwaMIcbs9E6VuUqbLrqUG0YuIW1lbkM0wilevFTNNf+fdEr
+tpdDD6GpkjLYSvB9rm6Hc0dVRcJU2i2ZkSdtIcqMLLiiznXPH8sw1x+ST0ffkkx7
+D6ZfjnHA70xvIipTmTGvM3dxl1gYUn3D4D/lsyVi59W1Te1Gyn4J3lGEkQL3/qHL
+E9Tme0/nmptO0TrGQYKTdVVcnJhWpUarDXXU/NeLVCIJEVxLF8wtWz/3ldkCAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAR2vpt91QSLfoh0qIW+bknV+ZilZdgRGh+kXm
+deqo+Drkz3BgmbXCIG6GGWF6LaS+iNt5YYyHUBKqLkvAfwtocLSVgNKYcgqG3kLZ
+qfoLrlT/IhHQ7WE6NOFQKcoJ/vuBMU7zjROjbbw2NdkO7hpJr2NQC5CgfPy89eJ6
+ly7wf3zxsVHk8fUnl1MgSb4lft4v5E73s9SpfRkKYr2BrkMCHQYawRAm9um9pW2S
+Qmk1L6OKkkCoR+LYrLyWY3s84NGVjP/fk7XHtvh2YtlB3sT4/yluOc8kXTp0DiuW
+UyvWTGYI+hHvZ4ol8LOttlV4Nwo4d8qgyuHlgiw1dnQBUZB7/g==
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/alice_by_B.pem 
b/extensions/standard-processors/tests/unit/resources/alice_by_B.pem
new file mode 100644
index 000000000..f8e1328e0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/alice_by_B.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAqq6fA21gSE4zJvfEwJ2hTa0tTfONsWLIfho0jNKvTytwoD12
+tj41mPdM8kWvYY/oEPQ8WCXAy4UmgwxxWvCmtDmm/NXT5B6/IMr+png82sdkPABY
+qoPxIb0lvRoSQ0DIYZhRLI1v6TdAFMHUVyksSihoaDZZ7/6Ne+0p0xHTqiyTrlgX
+axMCvmS1S/i6eCUfNKtBA3Y0nXLj1D24RYn7gxPKnAXMpCwoxW1+zRFA9SotyaO4
+mh3cp4MFALxdimu69q5/rduK9XlAXvqxlCb80mJ3he4TEio2J2KL+kiFksGKtj7W
+e8a6UgIzOYYTmyZ4ScYGnwvSxYeAkQX0oRPzbwIDAQABAoIBAA5aRsbcALez07tY
+JHRqDPFiOagPbf/XpbJs87RP1ywaJAtlf8ENdCZbzV2mHHxgxIwAbb6f1hmHJdjp
+R/L0v5/yJSent3y8VSglycon3D4tfDFLeilElRdYN38yXQzIutDyJQlRD3MWEU5t
+ijSWIsJNqZHx2BhFWJJuPBEis6DgoZXUIQLMCNDUPTofJS486yoYxdLhMC+i4+6q
+DnQ+k7a32tPhSJGKXVCeIbGA2gJFcxoRQvFuJuMCofCC1jJ1BlGKbaeal0RC9ICe
+TSk+SPmZMGMiOcLXBcJTCRxQP71mqtxn4cYJdvQDmXP+vZyVWBXB272YRwSoTbTL
+yKHsn4ECgYEA0/1V4tuOCfCP94Phk0zwaepeZxeGdRTQhum57V5L3xmlgkrrUx+5
+u6JzrMrxpO8h7l619jHPPaV0u0BE1bwjGeOPuPA9WCWqfzETDL/j0NbTOWRLnX5h
+91Ag8BYoGxt4gK856S/Me7PRPljNWiv4bDjGjXrYq/GDpSU9ZPnW9y8CgYEAzh3o
+Gz9XGEI9EjcwCZtuPoKmalvXFC2tNNkkX6FgZRire0mekJzKjbNgR7xrWYtUaZ55
+ZTEGL9bseRSOcfWeZBpJIclEIgyajYs8tW8RaTZoRQmu3fP+2IlFoU9SiSQR40hX
+7eeJOduHhbxXGW/JKuf7pBbGMSTH9n+MqfV/t8ECgYEAhEmmDABYvekp3hqlbOdp
+a57+tDShCnUnv9kg1niuvhViDFG2UlQM8oNozh6C9xrnQLpHsM/adKzIkIWFrx9N
+hD1WleENVvGCWQcFzUH953f3revhp/GTLuMI+unIs0nMQ/mVGOhkIZnP7Kk71JZ1
+2wr/FJDhn0MClM8NZfLm668CgYAYkbANL2uuVJb7COENBB4MDX7QxsnIeflfh1Ky
+o4XeBybJt2jTTB1I7szXQDp7ngQd4uoNid527WOauzyPkPukaw20nU0l0eLKZIKE
+Dg1BQV8Ee7cAdgk2voYySEZKWqZXNVRl42eUIfrxkhW/zndoSebRFHXjfcLoOyQF
+TmI/AQKBgEKHOHbemvkKtdE4GPz436Y9vodqOMe2qFAoW+nhOP4stGZhZpOyVMvk
+Y7PxpG+1bwPVg4ouSJyoq9Mw3Qnz5Reuot1h67rrw75CyUSDwuAdLpnS08t7T7LF
+N7b5nWOMcbUdaCaiAh3Mv8/9vo46ZdpdiFYgDLOCYiiLifva36rK
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDYWRAThkKd/J+oMW7tZBqEPx4XPMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEIwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWFsaWNlLmNvbTCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKqunwNtYEhOMyb3xMCdoU2tLU3z
+jbFiyH4aNIzSr08rcKA9drY+NZj3TPJFr2GP6BD0PFglwMuFJoMMcVrwprQ5pvzV
+0+QevyDK/qZ4PNrHZDwAWKqD8SG9Jb0aEkNAyGGYUSyNb+k3QBTB1FcpLEooaGg2
+We/+jXvtKdMR06osk65YF2sTAr5ktUv4unglHzSrQQN2NJ1y49Q9uEWJ+4MTypwF
+zKQsKMVtfs0RQPUqLcmjuJod3KeDBQC8XYpruvauf63bivV5QF76sZQm/NJid4Xu
+ExIqNidii/pIhZLBirY+1nvGulICMzmGE5smeEnGBp8L0sWHgJEF9KET828CAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAgc0RbVwCNVpCZjUyhVxBlqrS1S0K8ygdyVPG
+7/fcejKSA7aUEA4x5pehvNwhDHXnW9jiEdWbQLyJaNFyuQT/4R8tCZi0q6nQF7NN
+shL0B19QaHErSPHYudecshbB7VrsiYjG9Q3O8QMrulfLcz3b6RLqUTLCOSK7Nclk
+Nv+ONad80OCzjBUIOnIHzkfDRDChzsF90EGtyLtIXaUO/K7WZDlw6+Gf9rVtyH6S
+USyUzcKVDobxCcJkmlRbmSL3oExTQCukhH6aNhUyvUFtlaKEXqizFe+/ujtE0Ymg
+sAJWmEYct+9H6iINgq24kPn/h29EbvYcGWIEg20U8+AznHaMPQ==
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/ca_A.crt 
b/extensions/standard-processors/tests/unit/resources/ca_A.crt
new file mode 100644
index 000000000..3d283a6c0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/ca_A.crt
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDizCCAnOgAwIBAgIUca6kHRI3RSvFxnz4ksg2M33A3IowDQYJKoZIhvcNAQEL
+BQAwVDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxl
+LCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZpY2F0ZSBBdXRob3JpdHkgQTAgFw0yMjA5
+MTkwOTA1MzZaGA8yMDUyMDkxMTA5MDUzNlowVDELMAkGA1UEBhMCVVMxCzAJBgNV
+BAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxlLCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZp
+Y2F0ZSBBdXRob3JpdHkgQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
+AKsd1Yd8ds19WNU9ag6oDfgyxGzfnm4Nl8SrG34nA9D3DB7yXu3yN2feVPBNqDCU
+FnOCsQqiRKOROfNPEfz8fqqOqUcd8TK1RNg3JWtbOjy+BklBqm8NK3fdDkrD+Fuq
+sKOdqho5Xuy36Ec4y4citEW7FcRdu9LrAr81NbcOG0AU7a+SRdiROVUmSIDhQhDP
+j37HO9Rya6DizNSTIvQ4xQ/iQTzGqdZD9wy/AUQt+E7VrTslpIi48dWSjM6mZkGA
+1TcfAeDjJa7HrbnIZkvRhH5tUiHzCbQq+8N5SkSFssP8wd++8rydD0gWjxkOIHtR
+SGPoq5cp5uKAq4j7DXasneECAwEAAaNTMFEwHQYDVR0OBBYEFH6swKPx3nUFLoaW
+WiiwhERbyC5AMB8GA1UdIwQYMBaAFH6swKPx3nUFLoaWWiiwhERbyC5AMA8GA1Ud
+EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBACepoYKN+U5mqP5R6s/6/CM4
+3iBgcVRwAWNLd+cMhGzbSMvQbwji9AvE4lUxoLULIRl9EeedlKuEv01Eic4RGMq4
+1hG8mn3mSjITqQKYS+2o3sIKqtnfR86uQzQyixTTGiKJzpPV6vzQgtvkniCTPlgI
+eu59pNfQLUlYrgtJ+lTv/2/MPyS2I137DsjG+7ASVbDZ6uDbEp1/KyrgJB1skB+6
+s2Pxicf9X8mpfpuTqFiGyJUOdHmgYpx6ZxyAgMCm4C+a5e8I283d0xX06coChy00
+fh23THQ9O8HVQYejzHFfCoshIkj9l0Kkw6Um2aS4KLaZxAky+Kn+wgqFbgcJY0s=
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/ca_B.crt 
b/extensions/standard-processors/tests/unit/resources/ca_B.crt
new file mode 100644
index 000000000..5beee660a
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/ca_B.crt
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDizCCAnOgAwIBAgIUIRojQbIHUpmTeT1hp7BsxG8gFDswDQYJKoZIhvcNAQEL
+BQAwVDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxl
+LCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZpY2F0ZSBBdXRob3JpdHkgQjAgFw0yMjA5
+MTkwOTA1MzZaGA8yMDUyMDkxMTA5MDUzNlowVDELMAkGA1UEBhMCVVMxCzAJBgNV
+BAgMAkNBMRYwFAYDVQQKDA1FeGFtcGxlLCBJbmMuMSAwHgYDVQQDDBdDZXJ0aWZp
+Y2F0ZSBBdXRob3JpdHkgQjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
+AJfR48sziDgKH8+PfcVUMWag7ctGoBXgfOZ8h5gubQm5KrTD0rqHvf/8dLGvJ0aq
+tKVYnoFFjHikDIJZxuMYF6Vbq39FNinZugMCQsJ3gTWREq7tr3MLDfN+lD9rCxAr
+RDbfwaXN907ljXbsNoq3km9Bd43qAxDDND5N74o9wefFLLxcNo08d5aTN3LZY9g0
+b83ps+kc9Ysm9JBzFN10DJYIWwRWvCZL6hX10fWqrV9OcJgilCQk5PJgaZBppQgi
+hiTjq36vlBCTL3RO2MXecPSJLfigwKkT4WZwrG1E26jhh0lGVK6pdOs99JeTtzfr
+hC4lR8ExD8wFwvcn/8jFXxsCAwEAAaNTMFEwHQYDVR0OBBYEFDDrW/6QNEfo/7Br
+fpz8Vmm7KjWvMB8GA1UdIwQYMBaAFDDrW/6QNEfo/7Brfpz8Vmm7KjWvMA8GA1Ud
+EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAErDWgtX/547fwHv6+sOoSGT
+FHmVXba0IlcfW404r1Kp0MeNYZitQvi4SHL6CkxY+/RdgppxkbE5D8hUhk9PrSeo
+5E4ebPsnvSRlzDVWLm2ZpbLRO6BcrzbEN0b3ylZ3Kw/+SarDNBMSTDWZ5knWXUk6
+3Ckg2gg8VCLKxQK0IDTHtXq+WTKGmVf34dYbLfWHUnYr1DLUsxgnX4llHm3xOrzp
+ZqvW5cEdlj6+SW1azQgbFrEeWH7ebK5E4GBQ8LhRWbIpo6g2kzaGKTkijrk9agMs
+ByzjRdLitbwt07VNE9cNDVv0kC1PLZcz1TgNnaOl5CABqw0yMjLO5LEXUK4BYkU=
+-----END CERTIFICATE-----
diff --git a/extensions/standard-processors/tests/unit/resources/ca_cert.crt 
b/extensions/standard-processors/tests/unit/resources/ca_cert.crt
deleted file mode 100644
index e801553c9..000000000
--- a/extensions/standard-processors/tests/unit/resources/ca_cert.crt
+++ /dev/null
@@ -1,20 +0,0 @@
------BEGIN CERTIFICATE-----
-MIIDNDCCAhwCCQDoXhDkdH/BBjANBgkqhkiG9w0BAQsFADBcMQswCQYDVQQGEwJV
-UzELMAkGA1UECAwCQ0ExFjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMM
-H0dvb2QgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTkwNzEyMDk1MjUz
-WhcNNDkwNzA0MDk1MjUzWjBcMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExFjAU
-BgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMMH0dvb2QgUm9vdCBDZXJ0aWZp
-Y2F0ZSBBdXRob3JpdHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDG
-t5qp++4NAO83uASsVx7xRc3YS1Ss6La2opJTeXSnnsL6d+eLIUZrO6R/vofjLMPb
-qHisnQXAtl560d/XPBXm/ydp2IBLJQJW9aRxa/zqcf4tDTdBLKXYHhqKSQDJGS78
-vOuNuhf6T+p1guqnLYxwlRp6V8DMY/nC5n+IgByr9Jp2QtqJceH5WdyABVauqtMo
-LKXdbhfU6lDZ1XIZNeoKY8u2s34UQLUvOGaP/FzYHvKev1KzFF/nR3+svK8cvxXM
-EuqHM5tdtIp1ugjvR66PUIT00HoT00wS6VIpBdHq/8uXJeY77lr52xyVdk282tlw
-wr9/W0AGXjVMW3O+VRhFAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAE5HYjHkh3fI
-qakhENGL5PSszmOz5yQRrggP2ZJeEAoFjy5fbf/zUPIPMgMa0qM4QI+2C0iGlem1
-c1MCGNk5BiDPWMaUjppYmPZWkXzYu9Nl1dizXYidcnTiiBTROkpMij2fzCErymx9
-CmYxfeFyeJ5uAHSWSOGCfvlxi0vHvHn/+5rm0eqHcGP2c9ivW/SC/6RCXnHuIS9O
-O/UHrQPQe7YmdBgCHw4K4UHLZkYPH6osMPdII09PbZBB1TgrogbuA6TMp9NU6LrX
-WNN3nhFaVVjEb8tawMabfG9PU/1PKGRuNdaLsYb3IXhT0I/SWobD3MJ9xcO9sAhv
-QKZuUQf4ntI=
------END CERTIFICATE-----
diff --git 
a/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem 
b/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem
deleted file mode 100644
index e4891c58b..000000000
--- 
a/extensions/standard-processors/tests/unit/resources/cert_and_private_key.pem
+++ /dev/null
@@ -1,46 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIEpQIBAAKCAQEAtdEk8MHL8RFiOuHz/hfyjEe3PxumlNnubzn7u6EdJpiLhOLs
-cWcvQAbYvbsyI6duGzKh1FRNmS8+Q/gp/36vmssZy96B4K+axiG7WBqHfjPJbjfa
-NHbz3D7D7i36r/Vu/pXsBMPb/DSJ978maY3oikSB9906bx73XJHAx3RLxgLHl4po
-vt20SDOY9R2Klcbew4pDXuUpRdEK0h1+fdZkYU92YiAz2O86eYMvwgorCX9tBrcw
-sih/cYYiTQ9DMZ3DPJ0HfDH5T5gIPH/+5/YVHaTBdthhIZ13UNY/X8XcOVO/nlzC
-3/MtBiTxKj4zIyqncbClm4BqzW3S5hxUQXR32QIDAQABAoIBAAXoCmwrz4VATFGf
-X37EpmN6PPC25D13qvBAEPZycHD9iaLCgG3arUVGM6pON33DBaeqiGlOZ8rvJvWs
-TSj4o5nCuU7PJqb27W88T0q4aehmpEeJVvRXXOqtu020fq1Sqs1ob2dkOXRC/Kxo
-sEXDj2dWfGZh8HEFr4F5VqrkE0YWaQLaNHf9g6vAuOtlNMnhu5iM7mNq0qQi4Qg0
-zmOpEyAK5obhPEa8eYgjuWUeuul342wpMEGaFqD3lr4rnhcESZtm/S37L6lJ24UF
-SIBPzuEjEDlthlh3tqgKyQFsHvcMN4XN4850J/nMoX8jDcvnV7iYYFykHEb6y6FZ
-+ZlftNECgYEA2hS+s7yer2bQGw3LsKqwpNmpckLeAb84JEbra6i7A7SPUMEYwjOm
-Q2ePpz6ZBVDs7qODBgMV2g9a9GCbSzgV7SeQ0367dCPugONckVQvMVU5wPtSkFKF
-8jLn66+6YK64ASTqLLVd8TkU3bdByWcsh3JTR/lmDwlSeGjbm1OETj0CgYEA1W41
-Pi8ZbGDrpc9/CnyFCLMaipq9cAm4n/M58CVf0ogxeAXShIcDboTfv7lqpnqM2vg7
-sSRjyHz0++5VZTNSnQDlLIndQHQ6NKKC1tb0zNKlRuL2gwMHwMmWLqCjbLsqSP5E
-lHEM8fn2EVAMiKRod01kOY7OnUnPSSgMD7QvJc0CgYEAyWqZi0WtRhDuKd5+/0dW
-6JqDrp1lkDV887xwmLl5KH3uU8ZUSKENcXnHqs7c45UPj4SDcd0NpJ3EAqrrIvjE
-/4kocL2/AhBhqrbS+wLGp4iwU7WLVvJw9fXgT8S4na0hEyV2Bx7nifCPfgtQfmSF
-Mv/7PSFyCncwrTcjhP0I2H0CgYEAkTNbEaUlXLBLYRDbUx0HvLVstyMzAgf7DQaC
-QjiLCkYRsZ/0aqkX0pafSmYwgnYZYddDdO5W3Ez2tnacri7OY3X6c+SPG4x3FNwC
-u3qeLMKaIrHCF7t2CNicTbiHti9XQzWJHpwSvITbvUeCX2vKjm+eYfIf6q4OUazn
-F7/z23kCgYEA0r2N20DkEN9uD281SZbjAmkjYMMvUkDyChk2oWnODZcFkEhuaGM9
-0TBw1vjpoO67H/tSNEUziXMNLtnH4p5gRzP84ZGpMPxe4MK6pIwbDxhDyPlLCwOG
-dYnaYyWqsiMbqtv6LbMh/uatuMpCzqJEQZz80xDNUp3k90muk3+0M4w=
------END RSA PRIVATE KEY-----
------BEGIN CERTIFICATE-----
-MIIDJTCCAg0CCQDclmfqcI6z1DANBgkqhkiG9w0BAQsFADBcMQswCQYDVQQGEwJV
-UzELMAkGA1UECAwCQ0ExFjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xKDAmBgNVBAMM
-H0dvb2QgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTkwNzEyMDk1MjUz
-WhcNMjkwNzA5MDk1MjUzWjBNMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExFjAU
-BgNVBAoMDUV4YW1wbGUsIEluYy4xGTAXBgNVBAMMEGdvb2QuZXhhbXBsZS5jb20w
-ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC10STwwcvxEWI64fP+F/KM
-R7c/G6aU2e5vOfu7oR0mmIuE4uxxZy9ABti9uzIjp24bMqHUVE2ZLz5D+Cn/fq+a
-yxnL3oHgr5rGIbtYGod+M8luN9o0dvPcPsPuLfqv9W7+lewEw9v8NIn3vyZpjeiK
-RIH33TpvHvdckcDHdEvGAseXimi+3bRIM5j1HYqVxt7DikNe5SlF0QrSHX591mRh
-T3ZiIDPY7zp5gy/CCisJf20GtzCyKH9xhiJND0MxncM8nQd8MflPmAg8f/7n9hUd
-pMF22GEhnXdQ1j9fxdw5U7+eXMLf8y0GJPEqPjMjKqdxsKWbgGrNbdLmHFRBdHfZ
-AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAIh6k/epw3dWtRuMwXxjqEobi/RD/8Nk
-52kX6x8WTcnglrSzPSvkhnfR5PQ9whY2Zbw0aVdenejlGZEi8cAxwmJbN4NIhQwW
-FjHYYQA0MPgFGq/4XFT9E49aS212+ivUBRoPlWfw7QmCdGq3z6eQGfVtIGGLfSGH
-cvnC9Z4VdY0RJrnzgRKd7iq/RW66u3Uyg1fdOKCp9F5PSwwl+6dPgKO84muWjRi4
-9y+htcXSboEtYQy/ncul0MeJ8fGTY1YEG2QUolmCKeJ8a2e6SHcX0Unu+6tAD8sK
-fjpZAOI1lgRrhrIKhi3Rx0aCkhayhvvScDQL0ODA5ciCu5EJHRhWCbg=
------END CERTIFICATE-----
diff --git 
a/extensions/standard-processors/tests/unit/resources/localhost_by_A.pem 
b/extensions/standard-processors/tests/unit/resources/localhost_by_A.pem
new file mode 100644
index 000000000..a40251478
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/localhost_by_A.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEA1/8IgNpuq16MNaqaHNSCucyl0NbIFavnX+nOl+3Yu9/lHvQ6
+HY4PEU1Ma9tDOL2VON1PrFx+tJ1CTY6RDn5Ppj6pOpIeIxx1sGkxvJVeMhsmwojB
+3jQUJE6rgrUvsUr1YzvbJwwtfgr+PJD4uxS8V58kQdblNcWGZT8BMzracb3btNFn
+2n62JuXRKTUTwXxk9PEcYJdeWkUlu309dTvt8ipeygLMxUwP+oiRjTB5lDA4VydH
+qYWBi0iVPgcn5Qcxnxl3nVyTEHPszs/8fsCsqkxkhTmkyBeFAeDMoIxNDVIz92C0
+uzt2zURCBv0EOB0oIig9TTn1M27O421dYGAoIwIDAQABAoIBAHplUVs66/V9+TO7
+/eKSZZWFqvyhiPYG2HDYW7JqHCOyJvKYcIoo4s7qH4EK2ZfAjluPxUMlksMkTdsH
+C5nL57SL03eWLy+0Q9h4c6+qcJsyGY+o0TrqBfPhBH1n0KPFlzHpTDFfTDQdZJ3L
+hLb2dBeu3WvVq0MCMDsVLcfq9Lf4VIimKnufYai97p174IPovYLAhfhJiSrAVb+z
+eBBvpiBalFlZc46HaeC0pZrD4k5e5B/5J45b3KeCmmcWvjhfxaJfNAUKIK+fPtZO
+1txrN6+lEZBl3EpfUFWxfMAH73tPVUabO6Ap5OGVK2ahvBQw0UDqBMRM6o6LdX+9
+WKgcOhECgYEA74Zn61Qz2R+J1WRBlZYBAcm/w669B9Mwc/itLyOG77G8UTfqHgkp
+vjCNGEtG6doztmkaN50OuTp3/2iStLdFm1IDfJmaslZFHwsFSX9YeLPoUy3P93ri
+ePrsnnmdKqX1WBovm53kv4bqI9yY8OApD7s7inaMrX9PRbcWg60Q/KsCgYEA5tpU
+GGZEQ7R7m5/R5r5+4uZh9enoD0MDEkg0jm06N8pzk6CEzlynYwPHohkWF58C2lo2
+noufofVRJc+2MKnfL/JulCEL3dVyWVak3fnpl129BMdKJ9ZDsMlAlWeYqP0b9dwE
+Mye1r5ef8rJ8fLBeb8jZrM3+Tlh9OwV7dV7tkmkCgYBMr2VZ7G1lGDnSvfRZZdsQ
+rXzds3YFqVGb74PS0bcDyo2WGyazUw+wOm8R1hfwCtH/loq0P25VUyjT9rDxdrOs
+VIeVPsBOVFxw4eBhdYnnqwG4j7RDcW5MeVmEKz9sRhHUkR2o3tY7k4Am1xuIEtxS
+kwku/WFwso+4rDNjGOeVXQKBgQDj+ZgywEuJ0SKAfUP8awNDb+AtyeCxsavG0ieU
+v6lOj0+z6kE8yaND1OfA3KVEjnNyzsRBrgDnICwS+x0g0aDm6LLq9feSCsfyEe6e
+h753DMstfOFuldojK5vr73KC7/I8yhobqotx7Hq4Yistt76LBf5w+Ly7Aggp0TAq
+qKRUgQKBgDDzyJ8fvVYh/UzI/efY/nhIwkeD3KErszrnle39rKkFfaRaHM66IYF4
+5+1q3UdKGLdt6k1S4B3aLZxAVt7F6SzTFd3amGDJNi0Om0MJ8LnftUL5uBzTloem
+GB+XehWJz0NJcDLdTXN86f1j2LZUEiFR62nSt0uwq3mwMuGrqaEG
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDA8Dkntpi2PSSJDZGYjjG03qNbnMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEEwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANf/CIDabqtejDWqmhzUgrnMpdDW
+yBWr51/pzpft2Lvf5R70Oh2ODxFNTGvbQzi9lTjdT6xcfrSdQk2OkQ5+T6Y+qTqS
+HiMcdbBpMbyVXjIbJsKIwd40FCROq4K1L7FK9WM72ycMLX4K/jyQ+LsUvFefJEHW
+5TXFhmU/ATM62nG927TRZ9p+tibl0Sk1E8F8ZPTxHGCXXlpFJbt9PXU77fIqXsoC
+zMVMD/qIkY0weZQwOFcnR6mFgYtIlT4HJ+UHMZ8Zd51ckxBz7M7P/H7ArKpMZIU5
+pMgXhQHgzKCMTQ1SM/dgtLs7ds1EQgb9BDgdKCIoPU059TNuzuNtXWBgKCMCAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAJUfkmbPE2mrHu12gmsm6nSU7M1l1KELzMTRH
+eZf2NYaqLOOqlz7McsKgu5LJTRmEXi9ufUC1HQfKJLOaj2LkLmVgKzTrP33GQ4wf
+a7WOLeWs90kbiXV71iBBBXEuusMnMzuvBbcJTohwI5/svwCqEISpnSpVLi66dAej
+BTTT0MD5KZWcznMeD/nOMIu+5j0tNBGdCHwLXxbmyuqzBFmMmAJAm9WILGhAZGKk
+5IXbHrZYMEOMXoY2e/NnaMK9Q81q1YgeXZWBKLTF2g3RSrKFm84jBouPR1j7qmsr
+xST6nM0Ngr28dzYCLxOY7p6xqYVo6rxxMfSz2jkM6pycLgwy/w==
+-----END CERTIFICATE-----
diff --git 
a/extensions/standard-processors/tests/unit/resources/localhost_by_B.pem 
b/extensions/standard-processors/tests/unit/resources/localhost_by_B.pem
new file mode 100644
index 000000000..0912e95ea
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/resources/localhost_by_B.pem
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAwfjLXc6dHBaYsoWQaqsqwhJY68G7+35NVIU8BAKabHcUN5us
+IWieeg2Po1CPLoBfyfAq+06fj1T8D/3irUD/EB3QvDx1gCc+zY9BobIJKLi3Xeit
+zE7gEDDV1FFw8E2fBAptTMn/1GXyD5xsE9HQvtTffgELNXzl5Zp6pS5V6oN2X9Np
+3lExRFMmrEZmKuZMhA4UjLahA21Hv8GNlI4tqFazj6/timvSJHtqVGruhm/PAw7F
+1lOZ34t6/h/suayqRpDSZF+E7gnaFL8px/tpQwpLfHEixiMXjVrGBJRFnvHH8sdR
+wpU2mSR4j6KoUxS1wbW0iiOim3Fwy7vOd3ezvQIDAQABAoIBACWGAaFmBNKYNHXk
+jKl171GXxwfkdH8UUdVV6ORFtKXi61BOlx/nYzDtSqonPWubfexMv6PZ89gAcrqN
+PLqTZkQx4F1pvLlL3kRZwDKNhGQSR7as+mIZqBK5v8PQ9W4nNenMMpS2Rv1Js2f9
+tJKo9h7Ug1+WyBpSzQ57sdoeepRg/9pA489XjdqPIuyDSydhCEqqVe7slU62q1d+
+AjxTZ5tmD8dnbV0TWOphvBM8bVQc4UA/eMGIrS0jbkDLIuT63ZrPzVtaEqYK56O4
+IsWjaweFeIUJhtSyZ8/OA+7kqndxEZLCBY4XmUt+z2tyqtNg7rPcOwD04LWBPjW2
+KV60DOkCgYEA4+6kg8G098u7EkilpkuAmgnskm8Lw7Ic8IhJa8n7UVLFg4PNuPz9
+dcYcpy2wSu5CzSqNWNe9lePXX7VGVpSLvNxDg0R8L3m8m0Pu5sv17RdmKKJJz9D3
+kYCPgm6qitoTOKK2hOEiqVHzS4RydjaouyZDTFs1U/eHOkmR33JMP4MCgYEA2duV
+HKBwgP2PxVKUwwyqPK4spHVAfGh0cZWzTShvfB8DiLO9RyjmD9g9HMlEqc2FI9O5
+vhEsrWbVHQQ/kJHPLTCF7OKYYR+K9b8rezRvcQuvxCOnOcsatpahl5Fgt2o28SDn
+eu/2dh2NmTrM4jqNx4tvha5EVXUmbr7Qf+dgm78CgYEA0Kk7elLutJqRm19eJiqg
+hGPpavS1tGVuENTzQfYaWIyJvKgAwQT5k8PVn0Y4SaBtDx2RYG/AY2O9WyS8S66Z
+bj/Gnnknpt6vRwSdxDOb43y0TSako9cNjOpAdouRHKQfTI3IwUTJUnBvZgbOMmI/
+fXS9zz0ASOolpbqMDB66prkCgYB7CCHul8DRZ+EQq7FtcbKWMDrv6XOwjoDsQIGQ
+2nwTWaRySCdlj3hVjGX+4r9PMcy1zfVAnIxhpQhHqcWIDIA24gdQHyu09c5ROFQC
+8TraWaI6n3PqFISShwDdCvHWwzoh9NYlPG0wiUIVPfrE7BJzlZA2q5LVvCInOsWe
+5flOGwKBgCOyshMpw8FPCuNE8gEONH7aQ03MphLGzMcDVD7rl3I8d35z/IRhNgas
+V+I+Cfp6Tde7Ad8fNXX7ogoxxX/1UvkWGKg70ogqW9cKBqLsy7Pa+JMH201roJWR
+aODbnz02V1pQ8MBT4u7QG6PNyyue5W6h/2ADZCYvQJM8lrppKFE+
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIDITCCAgkCFDYWRAThkKd/J+oMW7tZBqEPx4XOMA0GCSqGSIb3DQEBCwUAMFQx
+CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UECgwNRXhhbXBsZSwgSW5j
+LjEgMB4GA1UEAwwXQ2VydGlmaWNhdGUgQXV0aG9yaXR5IEIwHhcNMjIwOTE5MDkw
+NTM3WhcNMzIwOTE2MDkwNTM3WjBGMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0Ex
+FjAUBgNVBAoMDUV4YW1wbGUsIEluYy4xEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMH4y13OnRwWmLKFkGqrKsISWOvB
+u/t+TVSFPAQCmmx3FDebrCFonnoNj6NQjy6AX8nwKvtOn49U/A/94q1A/xAd0Lw8
+dYAnPs2PQaGyCSi4t13orcxO4BAw1dRRcPBNnwQKbUzJ/9Rl8g+cbBPR0L7U334B
+CzV85eWaeqUuVeqDdl/Tad5RMURTJqxGZirmTIQOFIy2oQNtR7/BjZSOLahWs4+v
+7Ypr0iR7alRq7oZvzwMOxdZTmd+Lev4f7LmsqkaQ0mRfhO4J2hS/Kcf7aUMKS3xx
+IsYjF41axgSURZ7xx/LHUcKVNpkkeI+iqFMUtcG1tIojoptxcMu7znd3s70CAwEA
+ATANBgkqhkiG9w0BAQsFAAOCAQEAPa5w9kshcNgeOdsWJnKrGy31Jmhbi00a0ue0
+PSv1K49wvRIiHjk49DhOjHLRDoyEZ6AHme4dJIZ7G4GL4dKyW8eVi22nCN/2G6+u
+vssUXXNTTnaOXIXVVtnyTeMr4JHcysn0wMsMsApCvkpyB2euC+uvA8ppvfr6Zdng
+3okbQGhTvhkBZM2/jbtPb8O1XzXepPeYlXMiOcRsSA4oy5sYi8BFXuODCtH2qJD4
+zuSGEpWrDbzqUPGmXSoLALzpObI4v2yDLgrYZMMfOXOmtmeD1gfyIptl/pSeAko8
+lXxhgAY2ef2P1j2SCMIwNTPtIIrqJLmCt7EUWXjSpnnEIWP6bA==
+-----END CERTIFICATE-----
diff --git a/libminifi/include/utils/StringUtils.h 
b/libminifi/include/utils/StringUtils.h
index 942f0e13a..2b9b6d581 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -37,6 +37,15 @@
 #include "utils/gsl.h"
 #include "utils/meta/detected.h"
 
+// libc++ doesn't define operator<=> on strings, and apparently the operator 
rewrite rules don't automagically make one
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000
+#include <compare>
+
+constexpr std::strong_ordering operator<=>(const std::string& lhs, const 
std::string& rhs) noexcept {
+  return lhs.compare(rhs) <=> 0;
+}
+#endif
+
 namespace org::apache::nifi::minifi {
 namespace utils {
 
diff --git a/libminifi/include/utils/TimeUtil.h 
b/libminifi/include/utils/TimeUtil.h
index ce32146e9..2aab23ba4 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -33,7 +33,7 @@
 #include <memory>
 
 // libc++ doesn't define operator<=> on durations, and apparently the operator 
rewrite rules don't automagically make one
-#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000
 #include <compare>
 #endif
 
@@ -41,7 +41,7 @@
 
 #define TIME_FORMAT "%Y-%m-%d %H:%M:%S"
 
-#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000
 template<typename Rep1, typename Period1, typename Rep2, typename Period2>
 std::strong_ordering operator<=>(std::chrono::duration<Rep1, Period1> lhs, 
std::chrono::duration<Rep2, Period2> rhs) {
   if (lhs < rhs) {
diff --git a/libminifi/src/utils/net/SslServer.cpp 
b/libminifi/src/utils/net/SslServer.cpp
index 2d1e14807..682486c40 100644
--- a/libminifi/src/utils/net/SslServer.cpp
+++ b/libminifi/src/utils/net/SslServer.cpp
@@ -81,7 +81,6 @@ SslServer::SslServer(std::optional<size_t> max_queue_size, 
uint16_t port, std::s
     } else if (client_auth == ClientAuthOption::WANT) {
       context_.set_verify_mode(asio::ssl::verify_peer);
     }
-  startAccept();
 }
 
 std::shared_ptr<SslSession> SslServer::createSession() {
diff --git a/libminifi/src/utils/net/TcpServer.cpp 
b/libminifi/src/utils/net/TcpServer.cpp
index e3a53db55..742ef7f25 100644
--- a/libminifi/src/utils/net/TcpServer.cpp
+++ b/libminifi/src/utils/net/TcpServer.cpp
@@ -58,7 +58,6 @@ void TcpSession::handleReadUntilNewLine(std::error_code 
error_code) {
 
 TcpServer::TcpServer(std::optional<size_t> max_queue_size, uint16_t port, 
std::shared_ptr<core::logging::Logger> logger)
     : SessionHandlingServer<TcpSession>(max_queue_size, port, 
std::move(logger)) {
-  startAccept();
 }
 
 std::shared_ptr<TcpSession> TcpServer::createSession() {


Reply via email to