fgerlits commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r1004155881


##########
PROCESSORS.md:
##########
@@ -2167,6 +2168,32 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | After a successful SQL update operation, the incoming FlowFile 
sent here |
 
 
+## PutTCP
+
+### Description
+The PutTCP processor receives a FlowFile and transmits the FlowFile content 
over a TCP connection to the configured TCP server. By default, the FlowFiles 
are transmitted over the same TCP connection. To assist the TCP server with 
determining message boundaries, an optional "Outgoing Message Delimiter" string 
can be configured which is appended to the end of each FlowFiles content when 
it is transmitted over the TCP connection. An optional "Connection Per 
FlowFile" parameter can be specified to change the behaviour so that each 
FlowFiles content is transmitted over a single TCP connection which is closed 
after the FlowFile has been sent.
+
+### Properties
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                           | Default Value | Allowable Values | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
   |
+|--------------------------------|---------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Hostname**                   | localhost     |                  | The ip 
address or hostname of the destination.<br/>**Supports Expression Language: 
true**                                                                          
                                                                                
                                                                                
                                                                                
|
+| **Port**                       |               |                  | The port 
or service on the destination.<br/>**Supports Expression Language: true**       
                                                                                
                                                                                
                                                                                
                                                                          |
+| **Idle Connection Expiration** | 15 seconds    |                  | The 
amount of time a connection should be held open without being used before 
closing the connection. A value of 0 seconds will disable this 
feature.<br/>**Supports Expression Language: true**                             
                                                                                
                                                                                
                      |
+| **Timeout**                    | 15 seconds    |                  | The 
timeout for connecting to and communicating with the 
destination.<br/>**Supports Expression Language: true**                         
                                                                                
                                                                                
                                                                                
                          |
+| **Connection Per FlowFile**    | false         |                  | 
Specifies whether to send each FlowFile's content on an individual connection.  
                                                                                
                                                                                
                                                                                
                                                                                
   |
+| Outgoing Message Delimiter     |               |                  | 
Specifies the delimiter to use when sending messages out over the same TCP 
stream. The delimiter is appended to each FlowFile message that is transmitted 
over the stream so that the receiver can determine when one message ends and 
the next message begins. Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.<br/>**Supports Expression 
Language: true** |
+| SSL Context Service            |               |                  | The 
Controller Service to use in order to obtain an SSL Context. If this property 
is set, messages will be sent over a secure connection.                         
                                                                                
                                                                                
                                                                                
 |
+| Max Size of Socket Send Buffer |               |                  | The 
maximum size of the socket send buffer that should be used. This is a 
suggestion to the Operating System to indicate how big the socket buffer should 
be.                                                                             
                                                                                
                                                                                
         |
+
+### Properties
+| Name    | Description                                                        
        |
+|---------|----------------------------------------------------------------------------|
+| success | FlowFiles that are sent to the destination are sent out this 
relationship. |
+| failure | FlowFiles that encountered IO errors are send out this 
relationship.       |

Review Comment:
   tiny typo:
   ```suggestion
   | failure | FlowFiles that encountered IO errors are sent out this 
relationship.       |
   ```



##########
libminifi/include/utils/StringUtils.h:
##########
@@ -37,6 +37,19 @@
 #include "utils/gsl.h"
 #include "utils/meta/detected.h"
 
+// libc++ doesn't define operator<=> on strings, and apparently the operator 
rewrite rules don't automagically make one
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#include <compare>
+#endif
+
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+template<typename _CharT, typename _Traits, typename _Alloc>
+constexpr std::strong_ordering operator<=>(const std::basic_string<_CharT, 
_Traits, _Alloc>& __lhs,
+    const std::basic_string<_CharT, _Traits, _Alloc>& __rhs) noexcept {
+  return __lhs.compare(__rhs) <=> 0;
+}
+#endif

Review Comment:
   I would change the version check to `< 16000` because it looks like this is 
still not fixed in clang 15: https://godbolt.org/z/79W6jcEbe



##########
extensions/standard-processors/processors/PutTCP.h:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : 
hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  struct hash {
+    std::size_t operator () (const ConnectionId& connection_id) const {
+      return 
utils::hash_combine(std::hash<std::string>{}(connection_id.hostname_), 
std::hash<std::string>{}(connection_id.port_));
+    }
+  };
+
+  auto operator<=>(const ConnectionId&) const = default;
+
+  std::string& getHostname() { return hostname_; }
+  std::string& getPort() { return port_; }

Review Comment:
   could these return `const std::string&`?



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());

Review Comment:
   typo:
   ```suggestion
       logger_->log_trace("SessionAwareSslServer::createSession %p", 
session.get());
   ```



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with 
the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, 
core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = 
context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); 
timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  if (context->getProperty(SSLContextService.getName(), context_name) && 
!IsNullOrEmpty(context_name))
+    ssl_context_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+  delimiter_.clear();
+  if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+    std::transform(std::begin(*delimiter_str), std::end(*delimiter_str), 
std::back_inserter(delimiter_), [](char c) {
+      return std::byte(c);
+    });
+  }
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  if (auto max_size_of_socket_send_buffer = 
context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
+    max_size_of_socket_send_buffer_ = 
max_size_of_socket_send_buffer->getValue();
+  else
+    max_size_of_socket_send_buffer_.reset();
+}
+
+namespace {
+template<class SocketType>
+class ConnectionHandler : public IConnectionHandler {
+ public:
+  ConnectionHandler(ConnectionId connection_id,
+                    std::chrono::milliseconds timeout,
+                    std::shared_ptr<core::logging::Logger> logger,
+                    std::optional<size_t> max_size_of_socket_send_buffer,
+                    std::shared_ptr<controllers::SSLContextService> 
ssl_context_service)
+      : connection_id_(std::move(connection_id)),
+        timeout_(timeout),
+        logger_(std::move(logger)),
+        max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
+        ssl_context_service_(std::move(ssl_context_service)) {
+  }
+  ~ConnectionHandler() override = default;
+
+  nonstd::expected<void, std::error_code> sendData(const 
std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) override;
+
+ private:
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
+
+  [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const 
override {
+    if (!last_used_)
+      return false;
+    return *last_used_ >= (std::chrono::steady_clock::now() - dur);
+  }
+
+  void reset() override {
+    last_used_.reset();
+    socket_.reset();
+    io_context_.reset();
+    last_error_.clear();
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+  }
+
+  void checkDeadline(const std::error_code& error_code, const 
std::shared_ptr<SocketType>& socket);
+  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const 
std::shared_ptr<SocketType>& socket);
+
+  void handleConnect(const std::error_code& error,
+                     tcp::resolver::results_type::iterator endpoint_iter,
+                     const std::shared_ptr<SocketType>& socket);
+  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& 
endpoint_iter,
+                               const std::shared_ptr<SocketType>& socket);
+  void handleHandshake(const std::error_code& error,
+                       const tcp::resolver::results_type::iterator& 
endpoint_iter,
+                       const std::shared_ptr<SocketType>& socket);
+
+  void handleWrite(const std::error_code& error,
+                   std::size_t bytes_written,
+                   const std::vector<std::byte>& delimiter,
+                   const std::shared_ptr<SocketType>& socket);
+
+  void handleDelimiterWrite(const std::error_code& error, std::size_t 
bytes_written, const std::shared_ptr<SocketType>& socket);
+
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> 
establishConnection(const tcp::resolver::results_type& resolved_query);
+
+  [[nodiscard]] bool hasBeenUsed() const override { return 
last_used_.has_value(); }
+
+  ConnectionId connection_id_;
+  std::optional<std::chrono::steady_clock::time_point> last_used_;
+  asio::io_context io_context_;
+  std::error_code last_error_;
+  asio::steady_timer deadline_{io_context_};
+  std::chrono::milliseconds timeout_;
+  std::shared_ptr<SocketType> socket_;
+
+  std::shared_ptr<core::logging::Logger> logger_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+  nonstd::expected<tcp::resolver::results_type, std::error_code> 
resolveHostname();
+  nonstd::expected<void, std::error_code> sendDataToSocket(const 
std::shared_ptr<SocketType>& socket, const std::vector<std::byte>& data, const 
std::vector<std::byte>& delimiter);
+};
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> 
ConnectionHandler<SocketType>::sendData(const std::vector<std::byte>& data, 
const std::vector<std::byte>& delimiter) {
+  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& 
socket) { return sendDataToSocket(socket, data, delimiter); });;
+}
+
+template<class SocketType>
+nonstd::expected<std::shared_ptr<SocketType>, std::error_code> 
ConnectionHandler<SocketType>::getSocket() {
+  if (socket_ && socket_->lowest_layer().is_open())
+    return socket_;
+  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& 
resolved_query) { return establishConnection(resolved_query); });
+  if (!new_socket)
+    return nonstd::make_unexpected(new_socket.error());
+  socket_ = std::move(*new_socket);
+  return socket_;
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::checkDeadline(const std::error_code& 
error_code, const std::shared_ptr<SocketType>& socket) {
+  if (error_code != asio::error::operation_aborted) {
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+    last_error_ = asio::error::timed_out;
+    deadline_.async_wait([&](const std::error_code& error_code) { 
checkDeadline(error_code, socket); });

Review Comment:
   why is this new `async_wait` needed?



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();

Review Comment:
   it would be better to use the / operator in both places



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>

Review Comment:
   this should be `SessionType` instead of `SocketType`, as it can either be 
`TcpSession` or `SslSession`



##########
extensions/standard-processors/processors/PutTCP.h:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h"  // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors {
+
+class ConnectionId {
+ public:
+  ConnectionId(std::string hostname, std::string port) : 
hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+  struct hash {
+    std::size_t operator () (const ConnectionId& connection_id) const {
+      return 
utils::hash_combine(std::hash<std::string>{}(connection_id.hostname_), 
std::hash<std::string>{}(connection_id.port_));
+    }
+  };
+
+  auto operator<=>(const ConnectionId&) const = default;
+
+  std::string& getHostname() { return hostname_; }
+  std::string& getPort() { return port_; }
+
+ private:
+  std::string hostname_;
+  std::string port_;
+};
+
+class IConnectionHandler {
+ public:
+  virtual ~IConnectionHandler() = default;
+
+  [[nodiscard]] virtual bool hasBeenUsed() const = 0;
+  [[nodiscard]] virtual bool hasBeenUsedIn(std::chrono::milliseconds dur) 
const = 0;
+  virtual nonstd::expected<void, std::error_code> sendData(const 
std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) = 0;
+  virtual void reset() = 0;
+};
+
+class PutTCP final : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "The PutTCP 
processor receives a FlowFile and transmits the FlowFile content over a TCP 
connection to the configured TCP server. "
+      "By default, the FlowFiles are transmitted over the same TCP connection. 
To assist the TCP server with determining message boundaries, "
+      "an optional \"Outgoing Message Delimiter\" string can be configured 
which is appended to the end of each FlowFiles content when it is transmitted 
over the TCP connection. "
+      "An optional \"Connection Per FlowFile\" parameter can be specified to 
change the behaviour so that each FlowFiles content is transmitted over a 
single TCP connection "
+      "which is closed after the FlowFile has been sent.";
+  EXTENSIONAPI static const core::Property Hostname;
+  EXTENSIONAPI static const core::Property Port;
+  EXTENSIONAPI static const core::Property IdleConnectionExpiration;
+  EXTENSIONAPI static const core::Property Timeout;
+  EXTENSIONAPI static const core::Property ConnectionPerFlowFile;
+  EXTENSIONAPI static const core::Property OutgoingMessageDelimiter;
+  EXTENSIONAPI static const core::Property SSLContextService;
+  EXTENSIONAPI static const core::Property MaxSizeOfSocketSendBuffer;
+
+  static auto properties() { return std::array{Hostname, Port, 
IdleConnectionExpiration, Timeout, ConnectionPerFlowFile, 
OutgoingMessageDelimiter, SSLContextService, MaxSizeOfSocketSendBuffer}; }
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+  static auto relationships() { return std::array{Success, Failure}; }
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  explicit PutTCP(const std::string& name, const utils::Identifier& uuid = {});
+  PutTCP(const PutTCP&) = delete;
+  PutTCP& operator=(const PutTCP&) = delete;
+  ~PutTCP() final;
+
+  void initialize() final;
+  void notifyStop() final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
+
+ private:
+  void removeExpiredConnections();
+  void processFlowFile(std::shared_ptr<IConnectionHandler>& connection_handler,
+                       const std::vector<std::byte>& data,
+                       core::ProcessSession& session,
+                       const std::shared_ptr<core::FlowFile>& flow_file);
+
+  std::vector<std::byte> delimiter_;
+  std::optional<std::unordered_map<ConnectionId, 
std::shared_ptr<IConnectionHandler>, ConnectionId::hash>> connections_;
+  std::optional<std::chrono::milliseconds> idle_connection_expiration_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+  std::chrono::milliseconds timeout_;

Review Comment:
   `timeout_` should be initialized



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with 
the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, 
core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = 
context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); 
timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  if (context->getProperty(SSLContextService.getName(), context_name) && 
!IsNullOrEmpty(context_name))
+    ssl_context_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+  delimiter_.clear();
+  if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+    std::transform(std::begin(*delimiter_str), std::end(*delimiter_str), 
std::back_inserter(delimiter_), [](char c) {
+      return std::byte(c);
+    });

Review Comment:
   I think `delimiter = ranges::views::transform(*delimiter_str, [](char c) { 
return std::byte(c); }) | ranges::to<std::vector>();` would be nicer (also 
maybe `static_cast` instead of C-style cast)



##########
extensions/standard-processors/tests/unit/resources/bob_by_A.pem:
##########
@@ -0,0 +1,46 @@
+-----BEGIN RSA PRIVATE KEY-----

Review Comment:
   it doesn't look like we use these two `bob_by_...` certificates; can we 
remove them?



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+    LogTestController::getInstance().setTrace<PutTCP>();
+    LogTestController::getInstance().setInfo<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<utils::net::Server>();
+    put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+    put_tcp_->setProperty(PutTCP::Port, 
utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}"));
+    put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+    put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+    stopServer();
+  }
+
+  void startTCPServer() {
+    gsl_Expects(!listener_ && !server_thread_.joinable());
+    listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port_, 
core::logging::LoggerFactory<utils::net::Server>().getLogger());
+    server_thread_ = std::thread([this]() { listener_->run(); });
+  }
+
+  void startSSLServer() {
+    gsl_Expects(!listener_ && !server_thread_.joinable());
+    listener_ = std::make_unique<SessionAwareSslServer>(std::nullopt,
+        port_,
+        core::logging::LoggerFactory<utils::net::Server>().getLogger(),
+        createSslDataForServer(),
+        utils::net::SslServer::ClientAuthOption::REQUIRED);
+    server_thread_ = std::thread([this]() { listener_->run(); });
+  }
+
+  void stopServer() {
+    if (listener_)
+      listener_->stop();
+    if (server_thread_.joinable())
+      server_thread_.join();
+    listener_.reset();
+  }
+
+  size_t getNumberOfActiveSessions() {
+    if (auto session_aware_listener = 
dynamic_cast<ISessionAwareServer*>(listener_.get())) {
+      return session_aware_listener->getNumberOfSessions() - 1;  // There is 
always one inactive session waiting for a new connection
+    }
+    return -1;
+  }
+
+  void closeActiveConnections() {
+    if (auto session_aware_listener = 
dynamic_cast<ISessionAwareServer*>(listener_.get())) {
+      session_aware_listener->closeSessions();
+    }
+    std::this_thread::sleep_for(200ms);
+  }
+
+  auto trigger(const std::string_view& message) {
+    return controller_.trigger(message);
+  }
+
+  auto getContent(const auto& flow_file) {
+    return controller_.plan->getContent(flow_file);
+  }
+
+  std::optional<utils::net::Message> tryDequeueReceivedMessage() {
+    auto timeout = 200ms;
+    auto interval = 10ms;
+
+    auto start_time = std::chrono::system_clock::now();
+    utils::net::Message result;
+    while (start_time + timeout > std::chrono::system_clock::now()) {
+      if (listener_->tryDequeue(result))
+        return result;
+      std::this_thread::sleep_for(interval);
+    }
+    return std::nullopt;
+  }
+
+  void addSSLContextToPutTCP(const std::filesystem::path& ca_cert, 
std::optional<std::filesystem::path> client_cert_key) {

Review Comment:
   the second parameter should also be `const &`



##########
extensions/standard-processors/tests/unit/PutTCPTests.cpp:
##########
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "PutTCP.h"
+#include "controllers/SSLContextService.h"
+#include "core/ProcessSession.h"
+#include "utils/net/TcpServer.h"
+#include "utils/net/SslServer.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+using controllers::SSLContextService;
+
+namespace {
+using utils::net::TcpSession;
+using utils::net::TcpServer;
+
+using utils::net::SslSession;
+using utils::net::SslServer;
+
+class ISessionAwareServer {
+ public:
+  [[nodiscard]] virtual size_t getNumberOfSessions() const = 0;
+  virtual void closeSessions() = 0;
+};
+
+template<class SocketType>
+class SessionAwareServer : public ISessionAwareServer {
+ protected:
+  size_t getNumberOfSessions() const override {
+    std::lock_guard lock_guard{mutex_};
+    return sessions_.size();
+  }
+
+  void closeSessions() override {
+    std::lock_guard lock_guard{mutex_};
+    for (const auto& session_weak : sessions_) {
+      if (auto session = session_weak.lock()) {
+        auto& socket = session->getSocket();
+        if (socket.is_open()) {
+          socket.shutdown(asio::ip::tcp::socket::shutdown_both);
+          session->getSocket().close();
+        }
+      }
+    }
+  }
+
+  mutable std::mutex mutex_;
+  std::vector<std::weak_ptr<SocketType>> sessions_;
+};
+
+class SessionAwareTcpServer : public TcpServer, public 
SessionAwareServer<TcpSession> {
+ public:
+  using TcpServer::TcpServer;
+
+ protected:
+  std::shared_ptr<TcpSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = TcpServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+class SessionAwareSslServer : public SslServer, public 
SessionAwareServer<SslSession> {
+ public:
+  using SslServer::SslServer;
+
+ protected:
+  std::shared_ptr<SslSession> createSession() override {
+    std::lock_guard lock_guard{mutex_};
+    auto session = SslServer::createSession();
+    logger_->log_trace("SessionAwareTcpServer::createSession %p", 
session.get());
+    sessions_.emplace_back(session);
+    return session;
+  }
+};
+
+utils::net::SslData createSslDataForServer() {
+  const std::filesystem::path executable_dir = 
minifi::utils::file::FileUtils::get_executable_dir();
+  utils::net::SslData ssl_data;
+  ssl_data.ca_loc = (executable_dir / "resources/ca_A.crt").string();
+  ssl_data.cert_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  ssl_data.key_loc = (executable_dir / 
"resources/localhost_by_A.pem").string();
+  return ssl_data;
+}
+}  // namespace
+
+class PutTCPTestFixture {
+ public:
+  PutTCPTestFixture() {
+    LogTestController::getInstance().setTrace<PutTCP>();
+    LogTestController::getInstance().setInfo<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<utils::net::Server>();
+    put_tcp_->setProperty(PutTCP::Hostname, "${literal('localhost')}");
+    put_tcp_->setProperty(PutTCP::Port, 
utils::StringUtils::join_pack("${literal('", std::to_string(port_), "')}"));
+    put_tcp_->setProperty(PutTCP::Timeout, "200 ms");
+    put_tcp_->setProperty(PutTCP::OutgoingMessageDelimiter, "\n");
+  }
+
+  ~PutTCPTestFixture() {
+    stopServer();
+  }
+
+  void startTCPServer() {
+    gsl_Expects(!listener_ && !server_thread_.joinable());
+    listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port_, 
core::logging::LoggerFactory<utils::net::Server>().getLogger());

Review Comment:
   `getLogger` is static, so the constructor call is not needed:
   ```suggestion
       listener_ = std::make_unique<SessionAwareTcpServer>(std::nullopt, port_, 
core::logging::LoggerFactory<utils::net::Server>::getLogger());
   ```
   
   (also in line 141)



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Port = 
core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port or service on the destination.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration = 
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+    ->withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with 
the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile = 
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+    ->withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+    ->withDefaultValue(false)
+    ->isRequired(true)
+    ->supportsExpressionLanguage(false)
+    ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter = 
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+    ->withDescription("Specifies the delimiter to use when sending messages 
out over the same TCP stream. "
+                      "The delimiter is appended to each FlowFile message that 
is transmitted over the stream so that the receiver can determine when one 
message ends and the next message begins. "
+                      "Users should ensure that the FlowFile content does not 
contain the delimiter character to avoid errors.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutTCP::SSLContextService = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, messages will be sent over a secure 
connection.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()
+    ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer = 
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+    ->withDescription("The maximum size of the socket send buffer that should 
be used. This is a suggestion to the Operating System to indicate how big the 
socket buffer should be.")
+    ->isRequired(false)
+    ->asType<core::DataSizeValue>()
+    ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+    : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context, 
core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+  if (auto idle_connection_expiration = 
context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); 
timeout && timeout->getMilliseconds() > 0ms)
+    timeout_ = timeout->getMilliseconds();
+  else
+    timeout_ = 15s;
+
+  std::string context_name;
+  if (context->getProperty(SSLContextService.getName(), context_name) && 
!IsNullOrEmpty(context_name))
+    ssl_context_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+  delimiter_.clear();
+  if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+    std::transform(std::begin(*delimiter_str), std::end(*delimiter_str), 
std::back_inserter(delimiter_), [](char c) {
+      return std::byte(c);
+    });
+  }
+
+  if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  if (auto max_size_of_socket_send_buffer = 
context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
+    max_size_of_socket_send_buffer_ = 
max_size_of_socket_send_buffer->getValue();
+  else
+    max_size_of_socket_send_buffer_.reset();
+}
+
+namespace {
+template<class SocketType>
+class ConnectionHandler : public IConnectionHandler {
+ public:
+  ConnectionHandler(ConnectionId connection_id,
+                    std::chrono::milliseconds timeout,
+                    std::shared_ptr<core::logging::Logger> logger,
+                    std::optional<size_t> max_size_of_socket_send_buffer,
+                    std::shared_ptr<controllers::SSLContextService> 
ssl_context_service)
+      : connection_id_(std::move(connection_id)),
+        timeout_(timeout),
+        logger_(std::move(logger)),
+        max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
+        ssl_context_service_(std::move(ssl_context_service)) {
+  }
+  ~ConnectionHandler() override = default;
+
+  nonstd::expected<void, std::error_code> sendData(const 
std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) override;
+
+ private:
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
+
+  [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const 
override {
+    if (!last_used_)
+      return false;
+    return *last_used_ >= (std::chrono::steady_clock::now() - dur);
+  }
+
+  void reset() override {
+    last_used_.reset();
+    socket_.reset();
+    io_context_.reset();
+    last_error_.clear();
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+  }
+
+  void checkDeadline(const std::error_code& error_code, const 
std::shared_ptr<SocketType>& socket);
+  void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const 
std::shared_ptr<SocketType>& socket);
+
+  void handleConnect(const std::error_code& error,
+                     tcp::resolver::results_type::iterator endpoint_iter,
+                     const std::shared_ptr<SocketType>& socket);
+  void handleConnectionSuccess(const tcp::resolver::results_type::iterator& 
endpoint_iter,
+                               const std::shared_ptr<SocketType>& socket);
+  void handleHandshake(const std::error_code& error,
+                       const tcp::resolver::results_type::iterator& 
endpoint_iter,
+                       const std::shared_ptr<SocketType>& socket);
+
+  void handleWrite(const std::error_code& error,
+                   std::size_t bytes_written,
+                   const std::vector<std::byte>& delimiter,
+                   const std::shared_ptr<SocketType>& socket);
+
+  void handleDelimiterWrite(const std::error_code& error, std::size_t 
bytes_written, const std::shared_ptr<SocketType>& socket);
+
+  nonstd::expected<std::shared_ptr<SocketType>, std::error_code> 
establishConnection(const tcp::resolver::results_type& resolved_query);
+
+  [[nodiscard]] bool hasBeenUsed() const override { return 
last_used_.has_value(); }
+
+  ConnectionId connection_id_;
+  std::optional<std::chrono::steady_clock::time_point> last_used_;
+  asio::io_context io_context_;
+  std::error_code last_error_;
+  asio::steady_timer deadline_{io_context_};
+  std::chrono::milliseconds timeout_;
+  std::shared_ptr<SocketType> socket_;
+
+  std::shared_ptr<core::logging::Logger> logger_;
+  std::optional<size_t> max_size_of_socket_send_buffer_;
+
+  std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+  nonstd::expected<tcp::resolver::results_type, std::error_code> 
resolveHostname();
+  nonstd::expected<void, std::error_code> sendDataToSocket(const 
std::shared_ptr<SocketType>& socket, const std::vector<std::byte>& data, const 
std::vector<std::byte>& delimiter);
+};
+
+template<class SocketType>
+nonstd::expected<void, std::error_code> 
ConnectionHandler<SocketType>::sendData(const std::vector<std::byte>& data, 
const std::vector<std::byte>& delimiter) {
+  return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>& 
socket) { return sendDataToSocket(socket, data, delimiter); });;
+}
+
+template<class SocketType>
+nonstd::expected<std::shared_ptr<SocketType>, std::error_code> 
ConnectionHandler<SocketType>::getSocket() {
+  if (socket_ && socket_->lowest_layer().is_open())
+    return socket_;
+  auto new_socket = resolveHostname() | utils::flatMap([&](const auto& 
resolved_query) { return establishConnection(resolved_query); });
+  if (!new_socket)
+    return nonstd::make_unexpected(new_socket.error());
+  socket_ = std::move(*new_socket);
+  return socket_;
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::checkDeadline(const std::error_code& 
error_code, const std::shared_ptr<SocketType>& socket) {
+  if (error_code != asio::error::operation_aborted) {
+    deadline_.expires_at(asio::steady_timer::time_point::max());
+    last_error_ = asio::error::timed_out;
+    deadline_.async_wait([&](const std::error_code& error_code) { 
checkDeadline(error_code, socket); });
+    socket->lowest_layer().close();
+  }
+}
+
+template<class SocketType>
+void 
ConnectionHandler<SocketType>::startConnect(tcp::resolver::results_type::iterator
 endpoint_iter, const std::shared_ptr<SocketType>& socket) {
+  if (endpoint_iter == tcp::resolver::results_type::iterator()) {
+    logger_->log_trace("No more endpoints to try");
+    deadline_.cancel();
+    return;
+  }
+
+  last_error_.clear();
+  deadline_.expires_after(timeout_);
+  socket->lowest_layer().async_connect(endpoint_iter->endpoint(),
+                                       [&socket, endpoint_iter, 
this](std::error_code err) {
+                                         handleConnect(err, endpoint_iter, 
socket);
+                                       });
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::handleConnect(const std::error_code& error,
+                                                   
tcp::resolver::results_type::iterator endpoint_iter,
+                                                   const 
std::shared_ptr<SocketType>& socket) {

Review Comment:
   the parameters of this function, and several others later in this file, are 
not aligned correctly



##########
extensions/standard-processors/tests/unit/resources/ca_B.crt:
##########
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----

Review Comment:
   From the unit test, it looks like this is an invalid certificate.  Is that 
true, and if it is, then can you put "invalid" in the file name, please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to