This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new feb5a1b  MINIFICPP-1678 - Create PutUDP processor, add expected-lite
feb5a1b is described below

commit feb5a1b183dc82dd83c6ce53c8c20ba819c27863
Author: Marton Szasz <[email protected]>
AuthorDate: Wed Jan 12 13:04:41 2022 +0100

    MINIFICPP-1678 - Create PutUDP processor, add expected-lite
    
    Signed-off-by: Adam Debreceni <[email protected]>
    
    This closes #1208
---
 CMakeLists.txt                                     |   3 +
 LICENSE                                            |  30 ++
 NOTICE                                             |   2 +
 PROCESSORS.md                                      |  23 +
 README.md                                          |   2 +-
 cmake/ExpectedLite.cmake                           |  24 +
 .../expression-language/ProcessContextExpr.cpp     |   4 +
 .../expression-language/ProcessContextExpr.h       |  13 +-
 .../standard-processors/processors/PutUDP.cpp      | 163 +++++++
 extensions/standard-processors/processors/PutUDP.h |  54 +++
 .../TLSServerSocketSupportedProtocolsTest.cpp      |   9 +-
 .../standard-processors/tests/unit/PutUDPTests.cpp | 112 +++++
 .../tests/unit/TailFileTests.cpp                   |   2 +-
 libminifi/CMakeLists.txt                           |   4 +-
 libminifi/include/core/ConfigurableComponent.h     |  20 +-
 libminifi/include/core/FlowFile.h                  |  30 +-
 libminifi/include/core/ProcessContext.h            |  26 +-
 libminifi/include/core/ProcessSession.h            |  11 +-
 libminifi/include/core/Relationship.h              |   8 +-
 libminifi/include/io/ClientSocket.h                |  34 +-
 libminifi/include/utils/Deleters.h                 |  10 +-
 libminifi/include/utils/GeneralUtils.h             |   7 +-
 libminifi/include/utils/OptionalUtils.h            |  53 +--
 libminifi/include/utils/OsUtils.h                  |   1 -
 libminifi/include/utils/StringUtils.h              |  21 +-
 .../utils/detail/MonadicOperationWrappers.h        |  56 +++
 libminifi/include/utils/expected.h                 | 174 ++++++++
 libminifi/include/utils/gsl.h                      |   7 +
 libminifi/include/utils/meta/detected.h            |  52 +++
 libminifi/include/utils/net/DNS.h                  |  48 ++
 libminifi/include/utils/net/Socket.h               | 108 +++++
 libminifi/src/core/FlowFile.cpp                    |   2 +-
 libminifi/src/core/ProcessSession.cpp              |  28 ++
 libminifi/src/io/ClientSocket.cpp                  |  78 ++--
 libminifi/src/io/tls/TLSSocket.cpp                 |   2 +-
 libminifi/src/utils/NetworkInterfaceInfo.cpp       |  10 +-
 libminifi/src/utils/OsUtils.cpp                    |  34 --
 libminifi/src/utils/net/DNS.cpp                    |  94 ++++
 libminifi/src/utils/net/Socket.cpp                 |  72 +++
 libminifi/test/SingleInputTestController.h         |  99 +++++
 libminifi/test/TestBase.cpp                        |   4 +-
 libminifi/test/TestBase.h                          |   3 +-
 .../test/unit/ContentRepositoryDependentTests.h    |  35 +-
 libminifi/test/unit/ExpectedTest.cpp               | 485 +++++++++++++++++++++
 libminifi/test/unit/GeneralUtilsTest.cpp           |  15 +-
 libminifi/test/unit/NetUtilsTest.cpp               |  35 ++
 libminifi/test/unit/OptionalTest.cpp               |  11 +
 libminifi/test/unit/ProcessContextTest.cpp         |  36 ++
 48 files changed, 1895 insertions(+), 259 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index a4d924a..7c3f134 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -370,6 +370,9 @@ target_compile_definitions(gsl-lite INTERFACE 
${GslDefinitions})
 # date
 include(Date)
 
+# expected-lite
+include(ExpectedLite)
+
 # Update passthrough args used in configurations in patch commands
 if (WIN32)
        set(PASSTHROUGH_CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /w")
diff --git a/LICENSE b/LICENSE
index 5bcf14d..58467d6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -3065,3 +3065,33 @@ SGI C++ Standard Template Library license
 //
 
 --------------------------------------------------------------------------
+
+This project bundles 'expected-lite' under the Boost Software License 1.0
+
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
+---------------------------------------------------------------------------
+This project reuses test code from TartanLlama/expected from the Public Domain 
or under CC0
+https://creativecommons.org/publicdomain/zero/1.0/
diff --git a/NOTICE b/NOTICE
index b8cf4a2..4715f25 100644
--- a/NOTICE
+++ b/NOTICE
@@ -58,6 +58,8 @@ This software includes third party software subject to the 
following copyrights:
 - IANA timezone database - public domain
 - date (HowardHinnant/date) - notices below
 - range-v3 - Eric Niebler and other contributors
+- expected-lite - Copyright (C) 2016-2020 Martin Moene.
+- TartanLlama/expected - public domain, thanks to Sy Brand
 
 The licenses for these third party components are included in LICENSE.txt
 
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 2445d48..5b393a0 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -50,6 +50,7 @@
 - [PutS3Object](#puts3object)
 - [PutSFTP](#putsftp)
 - [PutSQL](#putsql)
+- [PutUDP](#putudp)
 - [QueryDatabaseTable](#querydatabasetable)
 - [ReplaceText](#replacetext)
 - [RetryFlowFile](#retryflowfile)
@@ -1493,6 +1494,28 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | - | - |
 |success|After a successful SQL update operation, the incoming FlowFile sent 
here|
 
+## PutUDP
+
+### Description
+
+The PutUDP processor receives a FlowFile and packages the FlowFile content 
into a single UDP datagram packet which is then transmitted to the configured 
UDP server.
+The processor doesn't guarantee a successful transfer, even if the flow file 
is routed to the success relationship.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name     | Default Value | Allowable Values | Description                    
                                                           |
+|----------|---------------|------------------|-------------------------------------------------------------------------------------------|
+| **Host** | localhost     |                  | The ip address or hostname of 
the destination.<br/>**Supports Expression Language: true** |
+| **Port** |               |                  | The port on the 
destination.<br/>**Supports Expression Language: true**                   |
+
+### Relationships
+
+| Name    | Description                                                        
           |
+|---------|-------------------------------------------------------------------------------|
+| success | FlowFiles that are sent to the destination are sent out this 
relationship.    |
+| failure | FlowFiles that encounter IO or network errors are send out this 
relationship. |
+
 ## QueryDatabaseTable
 
 ### Description
diff --git a/README.md b/README.md
index 57a84ae..934478b 100644
--- a/README.md
+++ b/README.md
@@ -66,7 +66,7 @@ The following table lists the base set of processors.
 
 | Extension Set        | Processors           |
 | ------------- |:-------------|
-| **Base**    | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>
 
[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P
 [...]
+| **Base**    | 
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>
 
[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P
 [...]
 
 The next table outlines CMAKE flags that correspond with MiNiFi extensions. 
Extensions that are enabled by default ( such as CURL ), can be disabled with 
the respective CMAKE flag on the command line.
 
diff --git a/cmake/ExpectedLite.cmake b/cmake/ExpectedLite.cmake
new file mode 100644
index 0000000..c89551c
--- /dev/null
+++ b/cmake/ExpectedLite.cmake
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include(FetchContent)
+
+FetchContent_Declare(expected-lite
+    URL      
https://github.com/martinmoene/expected-lite/archive/refs/tags/v0.5.0.tar.gz
+    URL_HASH 
SHA256=80f8c91d228cdc5cac3698141c0321d51dcdb0239c2fdcdeae7d46a9a58f2297
+)
+FetchContent_MakeAvailable(expected-lite)
diff --git a/extensions/expression-language/ProcessContextExpr.cpp 
b/extensions/expression-language/ProcessContextExpr.cpp
index 08e515b..1f6ec93 100644
--- a/extensions/expression-language/ProcessContextExpr.cpp
+++ b/extensions/expression-language/ProcessContextExpr.cpp
@@ -37,10 +37,12 @@ bool ProcessContextExpr::getProperty(const Property 
&property, std::string &valu
     }
     logger_->log_debug("Compiling expression for %s/%s: %s", 
getProcessorNode()->getName(), name, expression_str);
     expressions_.emplace(name, expression::compile(expression_str));
+    expression_strs_.insert_or_assign(name, expression_str);
   }
 
   minifi::expression::Parameters p(shared_from_this(), flow_file);
   value = expressions_[name](p).asString();
+  logger_->log_debug(R"(expression "%s" of property "%s" evaluated to: %s)", 
expression_strs_[name], name, value);
   return true;
 }
 
@@ -54,9 +56,11 @@ bool ProcessContextExpr::getDynamicProperty(const Property 
&property, std::strin
     ProcessContext::getDynamicProperty(name, expression_str);
     logger_->log_debug("Compiling expression for %s/%s: %s", 
getProcessorNode()->getName(), name, expression_str);
     dynamic_property_expressions_.emplace(name, 
expression::compile(expression_str));
+    expression_strs_.insert_or_assign(name, expression_str);
   }
   minifi::expression::Parameters p(shared_from_this(), flow_file);
   value = dynamic_property_expressions_[name](p).asString();
+  logger_->log_debug(R"(expression "%s" of dynamic property "%s" evaluated to: 
%s)", expression_strs_[name], name, value);
   return true;
 }
 
diff --git a/extensions/expression-language/ProcessContextExpr.h 
b/extensions/expression-language/ProcessContextExpr.h
index 192433d..31f48ef 100644
--- a/extensions/expression-language/ProcessContextExpr.h
+++ b/extensions/expression-language/ProcessContextExpr.h
@@ -19,7 +19,7 @@
 
 #include <ProcessContext.h>
 #include <memory>
-#include <map>
+#include <unordered_map>
 #include <string>
 #include "impl/expression/Expression.h"
 
@@ -35,7 +35,7 @@ namespace core {
  * state. With this case, we can rely on instantiation of a builder to create 
the necessary
  * ProcessContext. *
  */
-class ProcessContextExpr : public core::ProcessContext {
+class ProcessContextExpr final : public core::ProcessContext {
  public:
   /**
    std::forward of argument list did not work on all platform.
@@ -54,7 +54,7 @@ class ProcessContextExpr : public core::ProcessContext {
         logger_(logging::LoggerFactory<ProcessContextExpr>::getLogger()) {
   }
   // Destructor
-  virtual ~ProcessContextExpr() = default;
+  ~ProcessContextExpr() override = default;
   /**
    * Retrieves property using EL
    * @param property property
@@ -65,11 +65,10 @@ class ProcessContextExpr : public core::ProcessContext {
 
   bool getDynamicProperty(const Property &property, std::string &value, const 
std::shared_ptr<FlowFile> &flow_file) override;
 
- protected:
-  std::map<std::string, org::apache::nifi::minifi::expression::Expression> 
expressions_;
-  std::map<std::string, org::apache::nifi::minifi::expression::Expression> 
dynamic_property_expressions_;
-
  private:
+  std::unordered_map<std::string, 
org::apache::nifi::minifi::expression::Expression> expressions_;
+  std::unordered_map<std::string, 
org::apache::nifi::minifi::expression::Expression> 
dynamic_property_expressions_;
+  std::unordered_map<std::string, std::string> expression_strs_;
   std::shared_ptr<logging::Logger> logger_;
 };
 
diff --git a/extensions/standard-processors/processors/PutUDP.cpp 
b/extensions/standard-processors/processors/PutUDP.cpp
new file mode 100644
index 0000000..b6f0e3a
--- /dev/null
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutUDP.h"
+
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <winsock2.h>
+#else
+#include <netdb.h>
+#endif /* WIN32 */
+#include <tuple>
+#include <utility>
+
+#include "range/v3/view/join.hpp"
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "utils/net/DNS.h"
+#include "utils/StringUtils.h"
+#include "utils/net/Socket.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutUDP::Hostname = 
core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the destination.")
+    ->withDefaultValue("localhost")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PutUDP::Port = 
core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The port on the destination. Can be a service name like 
ssh or http, as defined in /etc/services.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship PutUDP::Success{"success", "FlowFiles that are sent 
to the destination are sent out this relationship."};
+const core::Relationship PutUDP::Failure{"failure", "FlowFiles that 
encountered IO errors are send out this relationship."};
+
+PutUDP::PutUDP(const std::string& name, const utils::Identifier& uuid)
+    :Processor(name, uuid), 
logger_{core::logging::LoggerFactory<PutUDP>::getLogger()}
+{ }
+
+PutUDP::~PutUDP() = default;
+
+void PutUDP::initialize() {
+  setSupportedProperties({
+      Hostname,
+      Port
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void PutUDP::notifyStop() {}
+
+void PutUDP::onSchedule(core::ProcessContext* const context, 
core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+  }
+  if (context->getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+  }
+}
+
+void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* 
const session) {
+  gsl_Expects(context && session);
+
+  const auto flow_file = session->get();
+  if (!flow_file) {
+    yield();
+    return;
+  }
+
+  const auto hostname = context->getProperty(Hostname, 
flow_file).value_or(std::string{});
+  const auto port = context->getProperty(Port, 
flow_file).value_or(std::string{});
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[%s] invalid target endpoint: hostname: %s, port: %s", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  const auto data = session->readBuffer(flow_file);
+  if (data.status < 0) {
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) -> 
std::string {
+    return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa); 
}).value_or("(n/a)");
+  };
+
+  const auto debug_log_resolved_names = [&, this](const addrinfo& names) -> 
decltype(auto) {
+    if (logger_->should_log(core::logging::LOG_LEVEL::debug)) {
+      std::vector<std::string> names_vector;
+      for (const addrinfo* it = &names; it; it = it->ai_next) {
+        names_vector.push_back(nonthrowing_sockaddr_ntop(it->ai_addr));
+      }
+      logger_->log_debug("resolved \'%s\' to: %s",
+          hostname,
+          names_vector | ranges::views::join(',') | ranges::to<std::string>());
+    }
+    return names;
+  };
+
+  utils::net::resolveHost(hostname.c_str(), port.c_str(), 
utils::net::IpProtocol::Udp)
+      | utils::map(utils::dereference)
+      | utils::map(debug_log_resolved_names)
+      | utils::flatMap([](const auto& names) { return 
utils::net::open_socket(names); })
+      | utils::flatMap([&, this](utils::net::OpenSocketResult 
socket_handle_and_selected_name) -> nonstd::expected<void, std::error_code> {
+        const auto& [socket_handle, selected_name] = 
socket_handle_and_selected_name;
+        logger_->log_debug("connected to %s", 
nonthrowing_sockaddr_ntop(selected_name->ai_addr));
+#ifdef WIN32
+        const char* const buffer_ptr = reinterpret_cast<const 
char*>(data.buffer.data());
+#else
+        const void* const buffer_ptr = data.buffer.data();
+#endif
+        const auto send_result = ::sendto(socket_handle.get(), buffer_ptr, 
data.buffer.size(), 0, selected_name->ai_addr, selected_name->ai_addrlen);
+        logger_->log_trace("sendto returned %ld", 
static_cast<long>(send_result));  // NOLINT: sendto
+        if (send_result == utils::net::SocketError) {
+          return nonstd::make_unexpected(utils::net::get_last_socket_error());
+        }
+        session->transfer(flow_file, Success);
+        return {};
+      })
+      | utils::orElse([&, this](std::error_code ec) {
+        gsl_Expects(ec);
+        logger_->log_error("%s", ec.message());
+        session->transfer(flow_file, Failure);
+      });
+}
+
+REGISTER_RESOURCE(PutUDP, "The PutUDP processor receives a FlowFile and 
packages the FlowFile content into a single UDP datagram packet which is then 
transmitted to the configured UDP server. "
+                          "The processor doesn't guarantee a successful 
transfer, even if the flow file is routed to the success relationship.");
+
+}  // namespace org::apache::nifi::minifi::processors
+
diff --git a/extensions/standard-processors/processors/PutUDP.h 
b/extensions/standard-processors/processors/PutUDP.h
new file mode 100644
index 0000000..c8ba6e9
--- /dev/null
+++ b/extensions/standard-processors/processors/PutUDP.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "Processor.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::core::logging { class Logger; }
+
+namespace org::apache::nifi::minifi::processors {
+class PutUDP final : public core::Processor {
+ public:
+  EXTENSIONAPI static const core::Property Hostname;
+  EXTENSIONAPI static const core::Property Port;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  explicit PutUDP(const std::string& name, const utils::Identifier& uuid = {});
+  PutUDP(const PutUDP&) = delete;
+  PutUDP& operator=(const PutUDP&) = delete;
+  ~PutUDP() final;
+
+  void initialize() final;
+  void notifyStop() final;
+  void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+  void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
+
+  core::annotation::Input getInputRequirement() const noexcept final { return 
core::annotation::Input::INPUT_REQUIRED; }
+  bool isSingleThreaded() const noexcept final { return true; /* for now */ }
+ private:
+  std::string hostname_;
+  std::string port_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+}  // namespace org::apache::nifi::minifi::processors
diff --git 
a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
 
b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
index 2ad3d01..b52a4e9 100644
--- 
a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
+++ 
b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
@@ -37,6 +37,7 @@
 #include "properties/Configure.h"
 #include "io/tls/TLSSocket.h"
 #include "io/tls/TLSServerSocket.h"
+#include "utils/net/Socket.h"
 
 namespace minifi = org::apache::nifi::minifi;
 
@@ -86,7 +87,7 @@ std::string str_addr(const sockaddr* const sa) {
       const auto addr_str = inet_ntop(AF_INET, &sin.sin_addr, buf, 
sizeof(buf));
 #endif
       if (!addr_str) {
-        throw std::runtime_error{minifi::io::get_last_socket_error_message()};
+        throw 
std::runtime_error{minifi::utils::net::get_last_socket_error().message()};
       }
       return std::string{addr_str};
     }
@@ -99,7 +100,7 @@ std::string str_addr(const sockaddr* const sa) {
       const auto addr_str = inet_ntop(AF_INET, &sin6.sin6_addr, buf, 
sizeof(buf));
 #endif
       if (!addr_str) {
-        throw std::runtime_error{minifi::io::get_last_socket_error_message()};
+        throw 
std::runtime_error{minifi::utils::net::get_last_socket_error().message()};
       }
       return std::string{addr_str};
     }
@@ -167,14 +168,14 @@ class SimpleSSLTestClient  {
       log_addrinfo(addr, logger);
       sfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
       if (sfd == INVALID_SOCKET) {
-        logger.log_error("socket: %s\n", 
minifi::io::get_last_socket_error_message());
+        logger.log_error("socket: %s\n", 
minifi::utils::net::get_last_socket_error().message());
         continue;
       }
       const auto connect_result = connect(sfd, addr->ai_addr, 
addr->ai_addrlen);
       if (connect_result == 0) {
         break;
       } else {
-        logger.log_error("connect to %s: %s\n", str_addr(addr->ai_addr), 
minifi::io::get_last_socket_error_message());
+        logger.log_error("connect to %s: %s\n", str_addr(addr->ai_addr), 
minifi::utils::net::get_last_socket_error().message());
       }
       sfd = INVALID_SOCKET;
 #ifdef WIN32
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp 
b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
new file mode 100644
index 0000000..787ebe0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleInputTestController.h"
+#include "PutUDP.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace {
+struct DatagramListener {
+  DatagramListener(const char* const hostname, const char* const port)
+    :resolved_names_{utils::net::resolveHost(hostname, port, 
utils::net::IpProtocol::Udp).value()},
+     open_socket_{utils::net::open_socket(*resolved_names_)
+        | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw 
std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ", 
hostname, " on port ", port)}; })}
+  {
+    const auto bind_result = bind(open_socket_.socket_.get(), 
open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
+    if (bind_result == utils::net::SocketError) {
+      throw std::runtime_error{utils::StringUtils::join_pack("bind: ", 
utils::net::get_last_socket_error().message())};
+    }
+  }
+
+  struct ReceiveResult {
+    std::string remote_address;
+    std::string message;
+  };
+
+  [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192) 
const {
+    ReceiveResult result;
+    result.message.resize(max_message_size);
+    sockaddr_storage remote_address{};
+    socklen_t addrlen = sizeof(remote_address);
+    const auto recv_result = recvfrom(open_socket_.socket_.get(), 
result.message.data(), result.message.size(), 0, 
std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
+    if (recv_result == utils::net::SocketError) {
+      throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ", 
utils::net::get_last_socket_error().message())};
+    }
+    result.message.resize(gsl::narrow<size_t>(recv_result));
+    result.remote_address = 
utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
+    return result;
+  }
+
+  std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
+  utils::net::OpenSocketResult open_socket_;
+};
+}  // namespace
+
+// Testing the failure relationship is not required, because since UDP in 
general without guarantees, flow files are always routed to success, unless 
there is
+// some weird IO error with the content repo.
+TEST_CASE("PutUDP", "[putudp]") {
+  const auto putudp = std::make_shared<PutUDP>("PutUDP");
+  auto random_engine = std::mt19937{std::random_device{}()};  // NOLINT: 
"Missing space before {  [whitespace/braces] [5]"
+  // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding 
to those
+  const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 - 
1}(random_engine);
+  const auto port_str = std::to_string(port);
+
+  test::SingleInputTestController controller{putudp};
+  LogTestController::getInstance().setTrace<PutUDP>();
+  LogTestController::getInstance().setTrace<core::ProcessContext>();
+  LogTestController::getInstance().setLevelByClassName(spdlog::level::trace, 
"org::apache::nifi::minifi::core::ProcessContextExpr");
+  putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
+  putudp->setProperty(PutUDP::Port, 
utils::StringUtils::join_pack("${literal('", port_str, "')}"));
+
+  DatagramListener listener{"localhost", port_str.c_str()};
+
+  {
+    const char* const message = "first message: hello";
+    const auto result = controller.trigger(message);
+    const auto& success_flow_files = result.at(PutUDP::Success);
+    REQUIRE(success_flow_files.size() == 1);
+    REQUIRE(result.at(PutUDP::Failure).empty());
+    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
+    auto receive_result = listener.receive();
+    REQUIRE(receive_result.message == message);
+    REQUIRE(!receive_result.remote_address.empty());
+  }
+
+  {
+    const char* const message = "longer message 
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
 [...]
+    const auto result = controller.trigger(message);
+    const auto& success_flow_files = result.at(PutUDP::Success);
+    REQUIRE(success_flow_files.size() == 1);
+    REQUIRE(result.at(PutUDP::Failure).empty());
+    REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
+    auto receive_result = listener.receive();
+    REQUIRE(receive_result.message == message);
+    REQUIRE(!receive_result.remote_address.empty());
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp 
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 8e7eb08..ab99255 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -586,7 +586,7 @@ TEST_CASE("TailFile can handle input files getting 
removed", "[multiple_file]")
 TEST_CASE("TailFile processes a very long line correctly", "[simple]") {
   std::string line1("012\n");
   std::string line2(8050, 0);
-  std::mt19937 gen(std::random_device{}());  // NOLINT (linter wants a space 
before '{')
+  std::mt19937 gen(std::random_device{}());  // NOLINT (linter wants a space 
before '{') [whitespace/braces]
   std::generate_n(line2.begin(), line2.size() - 1, [&]() -> char {
     // Make sure to only generate from characters that don't intersect with 
line1 and 3-4
     // Starting generation from 64 ensures that no numeric digit characters 
are added
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 6f96f96..d10b884 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
        set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/properties/*.cpp" 
"src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" 
"src/core/logging/internal/*.cpp" "src/core/state/*.cpp" 
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" 
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} 
"src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" 
"src/core/yaml/*.cpp" " [...]
+file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/properties/*.cpp" 
"src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" 
"src/core/logging/internal/*.cpp" "src/core/state/*.cpp" 
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" 
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} 
"src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" 
"src/core/yaml/*.cpp" " [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "src/agent/agent_version.cpp")
 
@@ -98,7 +98,7 @@ if(NOT EXCLUDE_BOOST)
 endif()
 
 include(RangeV3)
-list(APPEND LIBMINIFI_LIBRARIES yaml-cpp ZLIB::ZLIB concurrentqueue RapidJSON 
spdlog cron Threads::Threads gsl-lite libsodium range-v3)
+list(APPEND LIBMINIFI_LIBRARIES yaml-cpp ZLIB::ZLIB concurrentqueue RapidJSON 
spdlog cron Threads::Threads gsl-lite libsodium range-v3 expected-lite)
 if(NOT WIN32)
        list(APPEND LIBMINIFI_LIBRARIES OSSP::libuuid++)
 endif()
diff --git a/libminifi/include/core/ConfigurableComponent.h 
b/libminifi/include/core/ConfigurableComponent.h
index 86d588a..250ea4f 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -217,21 +217,21 @@ template<typename T>
 bool ConfigurableComponent::getProperty(const std::string name, T &value) 
const {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
-  auto &&it = properties_.find(name);
-  if (it != properties_.end()) {
-    const Property& item = it->second;
-    if (item.getValue().getValue() == nullptr) {
+  const auto property_name_and_object = properties_.find(name);
+  if (property_name_and_object != properties_.end()) {
+    const Property& property = property_name_and_object->second;
+    if (property.getValue().getValue() == nullptr) {
       // empty value
-      if (item.getRequired()) {
-        logger_->log_error("Component %s required property %s is empty", name, 
item.getName());
-        throw utils::internal::RequiredPropertyMissingException("Required 
property is empty: " + item.getName());
+      if (property.getRequired()) {
+        logger_->log_error("Component %s required property %s is empty", name, 
property.getName());
+        throw utils::internal::RequiredPropertyMissingException("Required 
property is empty: " + property.getName());
       }
-      logger_->log_debug("Component %s property name %s, empty value", name, 
item.getName());
+      logger_->log_debug("Component %s property name %s, empty value", name, 
property.getName());
       return false;
     }
-    logger_->log_debug("Component %s property name %s value %s", name, 
item.getName(), item.getValue().to_string());
+    logger_->log_debug("Component %s property name %s value %s", name, 
property.getName(), property.getValue().to_string());
     // cast throws if the value is invalid
-    value = static_cast<T>(item.getValue());
+    value = static_cast<T>(property.getValue());
     return true;
   } else {
     logger_->log_warn("Could not find property %s", name);
diff --git a/libminifi/include/core/FlowFile.h 
b/libminifi/include/core/FlowFile.h
index 15a3b94..6a794f9 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -53,7 +53,7 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    * Returns a pointer to this flow file record's
    * claim
    */
-  std::shared_ptr<ResourceClaim> getResourceClaim();
+  [[nodiscard]] std::shared_ptr<ResourceClaim> getResourceClaim() const;
   /**
    * Sets _claim to the inbound claim argument
    */
@@ -95,7 +95,7 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    * is marked as deleted.
    * @return marked deleted
    */
-  bool isDeleted() const;
+  [[nodiscard]] bool isDeleted() const;
 
   /**
    * Sets whether to mark this flow file record
@@ -108,24 +108,24 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    * Get entry date for this record
    * @return entry date uint64_t
    */
-  uint64_t getEntryDate() const;
+  [[nodiscard]] uint64_t getEntryDate() const;
 
   /**
    * Gets the event time.
    * @return event time.
    */
-  uint64_t getEventTime() const;
+  [[nodiscard]] uint64_t getEventTime() const;
   /**
    * Get lineage start date
    * @return lineage start date uint64_t
    */
-  uint64_t getlineageStartDate() const;
+  [[nodiscard]] uint64_t getlineageStartDate() const;
 
   /**
    * Sets the lineage start date
    * @param date new lineage start date
    */
-  void setLineageStartDate(const uint64_t date);
+  void setLineageStartDate(uint64_t date);
 
   void setLineageIdentifiers(const std::vector<utils::Identifier>& 
lineage_Identifiers) {
     lineage_Identifiers_ = lineage_Identifiers;
@@ -139,7 +139,7 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    */
   bool getAttribute(const std::string& key, std::string& value) const;
 
-  std::optional<std::string> getAttribute(const std::string& key) const;
+  [[nodiscard]] std::optional<std::string> getAttribute(const std::string& 
key) const;
 
   /**
    * Updates the value in the attribute map that corresponds
@@ -171,7 +171,7 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    * Returns the map of attributes
    * @return attributes.
    */
-  std::map<std::string, std::string> getAttributes() const {
+  [[nodiscard]] std::map<std::string, std::string> getAttributes() const {
     return {attributes_.begin(), attributes_.end()};
   }
 
@@ -200,7 +200,7 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    * Returns the size of corresponding flow file
    * @return size as a uint64_t
    */
-  uint64_t getSize() const;
+  [[nodiscard]] uint64_t getSize() const;
 
   /**
    * Sets the offset
@@ -215,7 +215,7 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
     to_be_processed_after_ = std::chrono::steady_clock::now() + duration;
   }
 
-  std::chrono::time_point<std::chrono::steady_clock> getPenaltyExpiration() 
const {
+  [[nodiscard]] std::chrono::time_point<std::chrono::steady_clock> 
getPenaltyExpiration() const {
     return to_be_processed_after_;
   }
 
@@ -223,13 +223,13 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    * Gets the offset within the flow file
    * @return size as a uint64_t
    */
-  uint64_t getOffset() const;
+  [[nodiscard]] uint64_t getOffset() const;
 
-  bool isPenalized() const {
+  [[nodiscard]] bool isPenalized() const {
     return to_be_processed_after_ > std::chrono::steady_clock::now();
   }
 
-  uint64_t getId() const {
+  [[nodiscard]] uint64_t getId() const {
     return id_;
   }
 
@@ -242,13 +242,13 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
    * Returns the original connection referenced by this record.
    * @return shared original connection pointer.
    */
-  std::shared_ptr<core::Connectable> getConnection() const;
+  [[nodiscard]] std::shared_ptr<core::Connectable> getConnection() const;
 
   void setStoredToRepository(bool storedInRepository) {
     stored = storedInRepository;
   }
 
-  bool isStored() const {
+  [[nodiscard]] bool isStored() const {
     return stored;
   }
 
diff --git a/libminifi/include/core/ProcessContext.h 
b/libminifi/include/core/ProcessContext.h
index 8ce37f6..271f70c 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -113,10 +113,14 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
   }
 
   template<typename T>
-  bool getProperty(const std::string &name, T &value) const {
+  std::enable_if_t<!std::is_convertible_v<T&, const FlowFile&> && 
!std::is_convertible_v<T&, const std::shared_ptr<FlowFile>&>,
+      bool> getProperty(const std::string &name, T &value) const {
     return getPropertyImp<typename std::common_type<T>::type>(name, value);
   }
 
+  template<typename T = std::string>
+  std::enable_if_t<std::is_default_constructible_v<T>, std::optional<T>> 
getProperty(const Property&, const std::shared_ptr<FlowFile>&);
+
   virtual bool getProperty(const Property &property, std::string &value, const 
std::shared_ptr<FlowFile>& /*flow_file*/) {
     return getProperty(property.getName(), value);
   }
@@ -394,6 +398,26 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
   bool initialized_;
 };
 
+template<typename T>
+inline std::enable_if_t<std::is_default_constructible_v<T>, std::optional<T>> 
ProcessContext::getProperty(const Property& property, const 
std::shared_ptr<FlowFile>& flow_file) {
+  T value;
+  std::string string_value;
+  if (!getProperty(property, string_value, flow_file)) return std::nullopt;
+  try {
+    if (!state::response::Value{string_value}.template convertValue(value)) 
return std::nullopt;
+  } catch (const utils::internal::ValueException&) {
+    return std::nullopt;
+  }
+  return value;
+}
+
+template<>
+inline std::optional<std::string> 
ProcessContext::getProperty<std::string>(const Property& property, const 
std::shared_ptr<FlowFile>& flow_file) {
+  std::string value;
+  if (!getProperty(property, value, flow_file)) return std::nullopt;
+  return value;
+}
+
 }  // namespace core
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index af54e95..63391f8 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -44,6 +44,14 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
+namespace detail {
+struct ReadBufferResult {
+  int64_t status;
+  std::vector<std::byte> buffer;
+};
+
+std::string to_string(const ReadBufferResult& read_buffer_result);
+}  // namespace detail
 
 // ProcessSession Class
 class ProcessSession : public ReferenceContainer {
@@ -88,10 +96,11 @@ class ProcessSession : public ReferenceContainer {
   void remove(const std::shared_ptr<core::FlowFile> &flow);
   // Execute the given read callback against the content
   int64_t read(const std::shared_ptr<core::FlowFile> &flow, 
InputStreamCallback *callback);
-
   int64_t read(const std::shared_ptr<core::FlowFile> &flow, 
InputStreamCallback&& callback) {
     return read(flow, &callback);
   }
+  // Read content into buffer
+  detail::ReadBufferResult readBuffer(const std::shared_ptr<core::FlowFile>& 
flow);
   // Execute the given write callback against the content
   void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback 
*callback);
 
diff --git a/libminifi/include/core/Relationship.h 
b/libminifi/include/core/Relationship.h
index bda280e..39aba57 100644
--- a/libminifi/include/core/Relationship.h
+++ b/libminifi/include/core/Relationship.h
@@ -62,7 +62,13 @@ class Relationship {
   std::string name_ = "undefined";
   std::string description_;
 };
-
 }  // namespace org::apache::nifi::minifi::core
 
+template<>
+struct std::hash<org::apache::nifi::minifi::core::Relationship> {
+  size_t operator()(const org::apache::nifi::minifi::core::Relationship& 
relationship) const noexcept {
+    return std::hash<std::string>{}(relationship.getName());
+  }
+};
+
 #endif  // LIBMINIFI_INCLUDE_CORE_RELATIONSHIP_H_
diff --git a/libminifi/include/io/ClientSocket.h 
b/libminifi/include/io/ClientSocket.h
index 85ea6b9..966dc71 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -20,17 +20,6 @@
 
 #include <utility>
 #include <cstdint>
-#ifdef WIN32
-#ifndef WIN32_LEAN_AND_MEAN
-#define WIN32_LEAN_AND_MEAN
-#endif /* WIN32_LEAN_AND_MEAN */
-#include <WinSock2.h>
-#else
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <unistd.h>
-#endif /* WIN32 */
 #include <mutex>
 #include <atomic>
 #include <string>
@@ -43,6 +32,7 @@
 #include "io/validation.h"
 #include "properties/Configure.h"
 #include "io/NetworkPrioritizer.h"
+#include "utils/net/Socket.h"
 
 namespace org {
 namespace apache {
@@ -50,22 +40,10 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-#ifdef WIN32
-using SocketDescriptor = SOCKET;
-using ip4addr = in_addr;
-#else
-using SocketDescriptor = int;
-using ip4addr = in_addr_t;
-#undef INVALID_SOCKET
-static constexpr SocketDescriptor INVALID_SOCKET = -1;
-#undef SOCKET_ERROR
-static constexpr int SOCKET_ERROR = -1;
-#endif /* WIN32 */
-
-/**
- * Return the last socket error message, based on errno on posix and 
WSAGetLastError() on windows
- */
-std::string get_last_socket_error_message();
+using utils::net::SocketDescriptor;
+using utils::net::ip4addr;
+using utils::net::InvalidSocket;
+using utils::net::SocketError;
 
 /**
  * @return >= 0 on posix, != INVALID_SOCKET on windows
@@ -221,7 +199,7 @@ class Socket : public BaseStream {
   io::NetworkInterface local_network_interface_;
 
   // connection information
-  SocketDescriptor socket_file_descriptor_{ INVALID_SOCKET };  // -1 on posix
+  SocketDescriptor socket_file_descriptor_{ InvalidSocket };  // -1 on posix
 
   fd_set total_list_{};
   fd_set read_fds_{};
diff --git a/libminifi/include/utils/Deleters.h 
b/libminifi/include/utils/Deleters.h
index 9ae11e2..e33c3b8 100644
--- a/libminifi/include/utils/Deleters.h
+++ b/libminifi/include/utils/Deleters.h
@@ -21,12 +21,10 @@
 #ifdef WIN32
 #include <WS2tcpip.h>
 #else
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
 #include <ifaddrs.h>
 #endif /* WIN32 */
 #include <utility>
+#include "utils/net/DNS.h"
 
 namespace org {
 namespace apache {
@@ -66,11 +64,7 @@ struct StackAwareDeleter {
   D impl_;
 };
 
-struct addrinfo_deleter {
-  void operator()(addrinfo* const p) const noexcept {
-    freeaddrinfo(p);
-  }
-};
+using net::addrinfo_deleter;
 
 #ifndef WIN32
 struct ifaddrs_deleter {
diff --git a/libminifi/include/utils/GeneralUtils.h 
b/libminifi/include/utils/GeneralUtils.h
index a433403..5d8def7 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -92,12 +92,17 @@ struct EnableSharedFromThis : virtual 
internal::EnableSharedFromThisBase {
 
 namespace detail {
 struct dereference_t {
+  template<typename T, typename = 
std::enable_if_t<is_not_null_v<std::decay_t<T>>>>
+  decltype(auto) operator()(T&& ptr) const noexcept { return 
*std::forward<T>(ptr); }
+};
+struct unsafe_dereference_t {
   template<typename T>
-  T &operator()(T *ptr) const noexcept { return *ptr; }
+  decltype(auto) operator()(T&& ptr) const noexcept { return 
*std::forward<T>(ptr); }
 };
 }  // namespace detail
 
 constexpr detail::dereference_t dereference{};
+constexpr detail::unsafe_dereference_t unsafe_dereference{};
 
 #if __cpp_lib_remove_cvref >= 201711L
 using std::remove_cvref_t;
diff --git a/libminifi/include/utils/OptionalUtils.h 
b/libminifi/include/utils/OptionalUtils.h
index cee564a..992493e 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -25,16 +25,13 @@
 
 #include "utils/GeneralUtils.h"
 #include "utils/gsl.h"
+#include "utils/detail/MonadicOperationWrappers.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
 template<typename T>
-std::optional<utils::remove_cvref_t<T>> optional_from_ptr(T&& obj) {
-  return obj == nullptr ? std::nullopt : 
std::optional<utils::remove_cvref_t<T>>{ std::forward<T>(obj) };
+std::optional<gsl::not_null<utils::remove_cvref_t<T>>> optional_from_ptr(T&& 
obj) {
+  return obj == nullptr ? std::nullopt : 
std::optional<gsl::not_null<utils::remove_cvref_t<T>>>{ 
gsl::make_not_null(std::forward<T>(obj)) };
 }
 
 template<typename, typename = void>
@@ -44,11 +41,6 @@ template<typename T>
 struct is_optional<std::optional<T>, void> : std::true_type {};
 
 namespace detail {
-template<typename T>
-struct map_wrapper {
-  T function;
-};
-
 // map implementation
 template<typename SourceType, typename F>
 auto operator|(const std::optional<SourceType>& o, map_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o)))
@@ -70,11 +62,6 @@ auto operator|(std::optional<SourceType>&& o, map_wrapper<F> 
f) noexcept(noexcep
   }
 }
 
-template<typename T>
-struct flat_map_wrapper {
-  T function;
-};
-
 // flatMap implementation
 template<typename SourceType, typename F>
 auto operator|(const std::optional<SourceType>& o, flat_map_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o)))
@@ -97,11 +84,6 @@ auto operator|(std::optional<SourceType>&& o, 
flat_map_wrapper<F> f) noexcept(no
   }
 }
 
-template<typename T>
-struct or_else_wrapper {
-  T function;
-};
-
 // orElse implementation
 template<typename SourceType, typename F>
 auto operator|(std::optional<SourceType> o, or_else_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function))))
@@ -123,22 +105,19 @@ auto operator|(std::optional<SourceType> o, 
or_else_wrapper<F> f) noexcept(noexc
     return std::invoke(std::forward<F>(f.function));
   }
 }
-}  // namespace detail
 
-template<typename T>
-detail::map_wrapper<T&&> map(T&& func) noexcept { return 
{std::forward<T>(func)}; }
-
-template<typename T>
-detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return 
{std::forward<T>(func)}; }
-
-template<typename T>
-detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return 
{std::forward<T>(func)}; }
-
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+// valueOrElse implementation
+template<typename SourceType, typename F>
+auto operator|(std::optional<SourceType> o, value_or_else_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function))))
+    -> std::common_type_t<SourceType, 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>> {
+  if (o) {
+    return *std::move(o);
+  } else {
+    return std::invoke(std::forward<F>(f.function));
+  }
+}
+}  // namespace detail
+}  // namespace org::apache::nifi::minifi::utils
 
 #endif  // LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
 
diff --git a/libminifi/include/utils/OsUtils.h 
b/libminifi/include/utils/OsUtils.h
index bf3615a..a9b6253 100644
--- a/libminifi/include/utils/OsUtils.h
+++ b/libminifi/include/utils/OsUtils.h
@@ -54,7 +54,6 @@ std::string getMachineArchitecture();
 extern std::string resolve_common_identifiers(const std::string &id);
 #endif
 
-std::string sockaddr_ntop(const sockaddr* const sa);
 
 } /* namespace OsUtils */
 } /* namespace utils */
diff --git a/libminifi/include/utils/StringUtils.h 
b/libminifi/include/utils/StringUtils.h
index a4e4e1c..7976097 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -36,6 +36,7 @@
 #endif
 #include "utils/FailurePolicy.h"
 #include "utils/gsl.h"
+#include "utils/meta/detected.h"
 
 namespace org::apache::nifi::minifi {
 namespace utils {
@@ -213,24 +214,6 @@ class StringUtils {
   static size_t size(const std::basic_string_view<CharT>& str) noexcept { 
return str.size(); }
 
   struct detail {
-    // partial detection idiom impl, from cppreference.com
-    struct nonesuch{};
-
-    template<typename Default, typename Void, template<class...> class Op, 
typename... Args>
-    struct detector {
-      using value_t = std::false_type;
-      using type = Default;
-    };
-
-    template<typename Default, template<class...> class Op, typename... Args>
-    struct detector<Default, std::void_t<Op<Args...>>, Op, Args...> {
-      using value_t = std::true_type;
-      using type = Op<Args...>;
-    };
-
-    template<template<class...> class Op, typename... Args>
-    using is_detected = typename detector<nonesuch, void, Op, 
Args...>::value_t;
-
     // implementation detail of join_pack
     template<typename CharT>
     struct str_detector {
@@ -239,7 +222,7 @@ class StringUtils {
     };
 
     template<typename ResultT, typename CharT, typename... Strs>
-    using valid_string_pack_t = 
std::enable_if_t<(is_detected<str_detector<CharT>::template valid_string_t, 
Strs>::value && ...), ResultT>;
+    using valid_string_pack_t = 
std::enable_if_t<(meta::is_detected_v<str_detector<CharT>::template 
valid_string_t, Strs> && ...), ResultT>;
 
     template<typename CharT, typename... Strs, valid_string_pack_t<void, 
CharT, Strs...>* = nullptr>
     static std::basic_string<CharT> join_pack(const Strs&... strs) {
diff --git a/libminifi/include/utils/detail/MonadicOperationWrappers.h 
b/libminifi/include/utils/detail/MonadicOperationWrappers.h
new file mode 100644
index 0000000..8950eb8
--- /dev/null
+++ b/libminifi/include/utils/detail/MonadicOperationWrappers.h
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <utility>
+
+namespace org::apache::nifi::minifi::utils {
+namespace detail {
+template<typename T>
+struct map_wrapper {
+  T function;
+};
+
+template<typename T>
+struct flat_map_wrapper {
+  T function;
+};
+
+template<typename T>
+struct or_else_wrapper {
+  T function;
+};
+
+template<typename T>
+struct value_or_else_wrapper {
+  T function;
+};
+
+}  // namespace detail
+
+template<typename T>
+detail::map_wrapper<T&&> map(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+
+template<typename T>
+detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+
+template<typename T>
+detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+
+template<typename T>
+detail::value_or_else_wrapper<T&&> valueOrElse(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/expected.h 
b/libminifi/include/utils/expected.h
new file mode 100644
index 0000000..c3bdce5
--- /dev/null
+++ b/libminifi/include/utils/expected.h
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <type_traits>
+#include <utility>
+#include "nonstd/expected.hpp"
+#include "utils/detail/MonadicOperationWrappers.h"
+#include "utils/GeneralUtils.h"
+#include "utils/meta/detected.h"
+
+namespace org::apache::nifi::minifi::utils {
+namespace detail {
+
+template<typename T>
+inline constexpr bool is_expected_v = false;
+
+template<typename V, typename E>
+inline constexpr bool is_expected_v<nonstd::expected<V, E>> = true;
+
+// map implementation
+template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+auto operator|(Expected&& object, map_wrapper<F> f) {
+  using value_type = typename utils::remove_cvref_t<Expected>::value_type;
+  using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+  if constexpr (std::is_same_v<value_type, void>) {
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+    if (!object.has_value()) {
+      return nonstd::expected<function_return_type, 
error_type>{nonstd::unexpect, std::forward<Expected>(object).error()};
+    }
+    if constexpr (std::is_same_v<function_return_type, void>) {
+      std::invoke(std::forward<F>(f.function));
+      return nonstd::expected<void, error_type>{};
+    } else {
+      return nonstd::expected<function_return_type, 
error_type>{std::invoke(std::forward<F>(f.function))};
+    }
+  } else {
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object)))>;
+    if (!object.has_value()) {
+      return nonstd::expected<function_return_type, 
error_type>{nonstd::unexpect, std::forward<Expected>(object).error()};
+    }
+    if constexpr (std::is_same_v<function_return_type, void>) {
+      std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object));
+      return nonstd::expected<void, error_type>{};
+    } else {
+      return nonstd::expected<function_return_type, 
error_type>{std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object))};
+    }
+  }
+}
+
+// flatMap
+template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+auto operator|(Expected&& object, flat_map_wrapper<F> f) {
+  using value_type = typename utils::remove_cvref_t<Expected>::value_type;
+  if constexpr (std::is_same_v<value_type, void>) {
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+    static_assert(is_expected_v<function_return_type>, "flatMap expects a 
function returning expected");
+    if (object.has_value()) {
+      return std::invoke(std::forward<F>(f.function));
+    } else {
+      return function_return_type{nonstd::unexpect, 
std::forward<Expected>(object).error()};
+    }
+  } else {
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object)))>;
+    static_assert(is_expected_v<function_return_type>, "flatMap expects a 
function returning expected");
+    if (object.has_value()) {
+      return std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object));
+    } else {
+      return function_return_type{nonstd::unexpect, 
std::forward<Expected>(object).error()};
+    }
+  }
+}
+
+template<typename Func, typename... Args>
+using invocable_detector = decltype(std::invoke(std::declval<Func>(), 
std::declval<Args>()...));
+
+// orElse
+template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+auto operator|(Expected&& object, or_else_wrapper<F> f) {
+  using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+  if (object.has_value()) {
+    return std::forward<Expected>(object);
+  }
+  constexpr bool invocable_with_argument = 
meta::is_detected_v<invocable_detector, F, error_type>;
+  if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) {
+    constexpr bool invocable_with_no_argument = 
meta::is_detected_v<invocable_detector, F>;
+    static_assert(invocable_with_no_argument);
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+    static_assert(is_expected_v<function_return_type> || 
std::is_same_v<function_return_type, void>, "orElse expects a function 
returning expected or void");
+    if constexpr (std::is_same_v<function_return_type, void>) {
+      std::invoke(std::forward<F>(f.function));
+      return std::forward<Expected>(object);
+    } else {
+      return std::invoke(std::forward<F>(f.function));
+    }
+  } else {
+    static_assert(invocable_with_argument);
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error()))>;
+    static_assert(is_expected_v<function_return_type> || 
std::is_same_v<function_return_type, void>, "orElse expects a function 
returning expected or void");
+    if constexpr (std::is_same_v<function_return_type, void>) {
+      std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error());
+      return std::forward<Expected>(object);
+    } else {
+      return std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error());
+    }
+  }
+}
+
+// valueOrElse
+template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+typename utils::remove_cvref_t<Expected>::value_type operator|(Expected&& 
object, value_or_else_wrapper<F> f) {
+  using value_type = typename utils::remove_cvref_t<Expected>::value_type;
+  using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+  if (object.has_value()) {
+    return *std::forward<Expected>(object);
+  }
+  constexpr bool invocable_with_argument = 
meta::is_detected_v<invocable_detector, F, error_type>;
+  if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) {
+    constexpr bool invocable_with_no_argument = 
meta::is_detected_v<invocable_detector, F>;
+    static_assert(invocable_with_no_argument);
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+    static_assert((std::is_same_v<function_return_type, void> && 
std::is_default_constructible_v<value_type>) || 
std::is_constructible_v<value_type, function_return_type>,
+        "valueOrElse expects a function returning value_type or void");
+    if constexpr (std::is_same_v<function_return_type, void>) {
+      std::invoke(std::forward<F>(f.function));
+      return value_type{};
+    } else {
+      return value_type{std::invoke(std::forward<F>(f.function))};
+    }
+  } else {
+    static_assert(invocable_with_argument);
+    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error()))>;
+    static_assert((std::is_same_v<function_return_type, void> && 
std::is_default_constructible_v<value_type>) || 
std::is_constructible_v<value_type, function_return_type>,
+        "valueOrElse expects a function returning value_type or void");
+    if constexpr (std::is_same_v<function_return_type, void>) {
+      std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error());
+      return value_type{};
+    } else {
+      return value_type{std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error())};
+    }
+  }
+}
+}  // namespace detail
+
+template<typename F, typename... Args>
+auto try_expression(F&& action, Args&&... args) noexcept {
+  using action_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(action), 
std::forward<Args>(args)...))>;
+  using return_type = nonstd::expected<action_return_type, std::exception_ptr>;
+  try {
+    if constexpr (std::is_same_v<action_return_type, void>) {
+      std::invoke(std::forward<F>(action), std::forward<Args>(args)...);
+      return return_type{};
+    } else {
+      return return_type{std::invoke(std::forward<F>(action), 
std::forward<Args>(args)...)};
+    }
+  } catch (...) {
+    return return_type{nonstd::unexpect, std::current_exception()};
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/gsl.h b/libminifi/include/utils/gsl.h
index 9bd4f09..01aae19 100644
--- a/libminifi/include/utils/gsl.h
+++ b/libminifi/include/utils/gsl.h
@@ -46,6 +46,13 @@ Container<detail::remove_cvref_t<T>> span_to(gsl::span<T> 
span) {
       "The destination container must have an iterator (pointer) range 
constructor");
   return span_to<Container<detail::remove_cvref_t<T>>>(span);
 }
+
+template<typename T>
+struct is_not_null : std::false_type {};
+template<typename T>
+struct is_not_null<gsl::not_null<T>> : std::true_type {};
+template<typename T>
+inline constexpr bool is_not_null_v = is_not_null<T>::value;
 }  // namespace utils
 
 }  // namespace minifi
diff --git a/libminifi/include/utils/meta/detected.h 
b/libminifi/include/utils/meta/detected.h
new file mode 100644
index 0000000..844d3c1
--- /dev/null
+++ b/libminifi/include/utils/meta/detected.h
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <type_traits>
+
+namespace org::apache::nifi::minifi::utils::meta {
+// detection idiom impl, from cppreference.com
+struct nonesuch{};
+
+namespace detail {
+template<typename Default, typename Void, template<class...> class Op, 
typename... Args>
+struct detector {
+  using value_t = std::false_type;
+  using type = Default;
+};
+
+template<typename Default, template<class...> class Op, typename... Args>
+struct detector<Default, std::void_t<Op<Args...>>, Op, Args...> {
+  using value_t = std::true_type;
+  using type = Op<Args...>;
+};
+}  // namespace detail
+
+template<template<class...> class Op, typename... Args>
+using is_detected = typename detail::detector<nonesuch, void, Op, 
Args...>::value_t;
+
+template<template<class...> class Op, typename... Args>
+inline constexpr bool is_detected_v = is_detected<Op, Args...>::value;
+
+template<template<class...> class Op, typename... Args>
+using detected_t = typename detail::detector<nonesuch, void, Op, 
Args...>::type;
+
+template<typename Default, template<class...> class Op, typename... Args>
+using detected_or = detail::detector<Default, void, Op, Args...>;
+
+}  // namespace org::apache::nifi::minifi::utils::meta
diff --git a/libminifi/include/utils/net/DNS.h 
b/libminifi/include/utils/net/DNS.h
new file mode 100644
index 0000000..786eeeb
--- /dev/null
+++ b/libminifi/include/utils/net/DNS.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <memory>
+#include <string>
+#include <string_view>
+#include <system_error>
+#include "nonstd/expected.hpp"
+#include "utils/gsl.h"
+
+struct addrinfo;
+
+namespace org::apache::nifi::minifi::utils::net {
+struct addrinfo_deleter {
+  void operator()(addrinfo*) const noexcept;
+};
+
+enum class IpProtocol {
+  Tcp,
+  Udp
+};
+
+nonstd::expected<gsl::not_null<std::unique_ptr<addrinfo, addrinfo_deleter>>, 
std::error_code> resolveHost(const char* hostname, const char* port, IpProtocol 
= IpProtocol::Tcp,
+    bool need_canonname = false);
+inline auto resolveHost(const char* const port, const IpProtocol proto = 
IpProtocol::Tcp, const bool need_canonname = false) {
+  return resolveHost(nullptr, port, proto, need_canonname);
+}
+inline auto resolveHost(const char* const hostname, const uint16_t port, const 
IpProtocol proto = IpProtocol::Tcp, const bool need_canonname = false) {
+  return resolveHost(hostname, std::to_string(port).c_str(), proto, 
need_canonname);
+}
+inline auto resolveHost(const uint16_t port, const IpProtocol proto = 
IpProtocol::Tcp, const bool need_canonname = false) {
+  return resolveHost(nullptr, port, proto, need_canonname);
+}
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Socket.h 
b/libminifi/include/utils/net/Socket.h
new file mode 100644
index 0000000..65e92c4
--- /dev/null
+++ b/libminifi/include/utils/net/Socket.h
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <string>
+#include <system_error>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif /* WIN32_LEAN_AND_MEAN */
+#include <WinSock2.h>
+#else
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#endif /* WIN32 */
+#include "nonstd/expected.hpp"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils::net {
+#ifdef WIN32
+using SocketDescriptor = SOCKET;
+using ip4addr = in_addr;
+inline constexpr SocketDescriptor InvalidSocket = INVALID_SOCKET;
+constexpr int SocketError = SOCKET_ERROR;
+#else
+using SocketDescriptor = int;
+using ip4addr = in_addr_t;
+#undef INVALID_SOCKET
+inline constexpr SocketDescriptor InvalidSocket = -1;
+#undef SOCKET_ERROR
+inline constexpr int SocketError = -1;
+#endif /* WIN32 */
+
+/**
+ * Return the last socket error code, based on errno on posix and 
WSAGetLastError() on windows.
+ */
+std::error_code get_last_socket_error();
+
+inline void close_socket(SocketDescriptor sockfd) {
+#ifdef WIN32
+  closesocket(sockfd);
+#else
+  ::close(sockfd);
+#endif
+}
+
+class UniqueSocketHandle {
+ public:
+  explicit UniqueSocketHandle(SocketDescriptor owner_sockfd) noexcept
+      :owner_sockfd_(owner_sockfd)
+  {}
+
+  UniqueSocketHandle(const UniqueSocketHandle&) = delete;
+  UniqueSocketHandle(UniqueSocketHandle&& other) noexcept
+    :owner_sockfd_{std::exchange(other.owner_sockfd_, InvalidSocket)}
+  {}
+  ~UniqueSocketHandle() noexcept {
+    if (owner_sockfd_ != InvalidSocket) close_socket(owner_sockfd_);
+  }
+  UniqueSocketHandle& operator=(const UniqueSocketHandle&) = delete;
+  UniqueSocketHandle& operator=(UniqueSocketHandle&& other) noexcept {
+    if (&other == this) return *this;
+    if (owner_sockfd_ != InvalidSocket) close_socket(owner_sockfd_);
+    owner_sockfd_ = std::exchange(other.owner_sockfd_, InvalidSocket);
+    return *this;
+  }
+
+  [[nodiscard]] SocketDescriptor get() const noexcept { return owner_sockfd_; }
+  [[nodiscard]] SocketDescriptor release() noexcept { return 
std::exchange(owner_sockfd_, InvalidSocket); }
+  explicit operator bool() const noexcept { return owner_sockfd_ != 
InvalidSocket; }
+  bool operator==(UniqueSocketHandle other) const noexcept { return 
owner_sockfd_ == other.owner_sockfd_; }
+
+ private:
+  SocketDescriptor owner_sockfd_;
+};
+
+struct OpenSocketResult {
+  UniqueSocketHandle socket_;
+  gsl::not_null<const addrinfo*> selected_name;
+};
+
+/**
+ * Iterate through getaddrinfo_result and try to call socket() until it 
succeeds
+ * @param getaddrinfo_result
+ * @return The file descriptor and the selected list element on success, or 
nullopt on error. Use get_last_socket_error_message() to get the error message.
+ */
+nonstd::expected<OpenSocketResult, std::error_code> 
open_socket(gsl::not_null<const addrinfo*> getaddrinfo_result);
+inline nonstd::expected<OpenSocketResult, std::error_code> open_socket(const 
addrinfo& getaddrinfo_result) { return 
open_socket(gsl::make_not_null(&getaddrinfo_result)); }
+
+
+std::string sockaddr_ntop(const sockaddr* sa);
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 9606184..05b589c 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -92,7 +92,7 @@ void FlowFile::setDeleted(const bool deleted) {
   }
 }
 
-std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
+std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() const {
   return claim_;
 }
 
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 6c6136f..f4afffd 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -60,6 +60,10 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
+std::string detail::to_string(const detail::ReadBufferResult& 
read_buffer_result) {
+  return std::string(reinterpret_cast<const 
char*>(read_buffer_result.buffer.data()), read_buffer_result.buffer.size());
+}
+
 std::shared_ptr<utils::IdGenerator> ProcessSession::id_generator_ = 
utils::IdGenerator::getIdGenerator();
 
 ProcessSession::ProcessSession(std::shared_ptr<ProcessContext> processContext)
@@ -402,6 +406,30 @@ int64_t ProcessSession::readWrite(const 
std::shared_ptr<core::FlowFile> &flow, I
   }
 }
 
+detail::ReadBufferResult ProcessSession::readBuffer(const 
std::shared_ptr<core::FlowFile>& flow) {
+  detail::ReadBufferResult result;
+  struct Callback : InputStreamCallback {
+    detail::ReadBufferResult& result;
+    ProcessSession& session;
+    Callback(detail::ReadBufferResult& result, ProcessSession& session)
+      :result{result}, session{session}
+    {}
+    int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) final {
+      gsl_Expects(inputStream);
+      result.buffer.resize(inputStream->size());
+      const auto read_status = 
inputStream->read(reinterpret_cast<uint8_t*>(result.buffer.data()), 
result.buffer.size());
+      if (read_status != result.buffer.size()) {
+        session.logger_->log_error("readBuffer: %zu bytes were requested from 
the stream but %zu bytes were read. Rolling back.", result.buffer.size(), 
read_status);
+        throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire 
FlowFile.");
+      }
+      return gsl::narrow<int64_t>(read_status);
+    }
+  };
+  Callback cb{result, *this};
+  result.status = read(flow, &cb);
+  return result;
+}
+
 void ProcessSession::importFrom(io::InputStream&& stream, const 
std::shared_ptr<core::FlowFile> &flow) {
   importFrom(stream, flow);
 }
diff --git a/libminifi/src/io/ClientSocket.cpp 
b/libminifi/src/io/ClientSocket.cpp
index 61d502b..18ee695 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -48,20 +48,13 @@
 #include "utils/file/FileUtils.h"
 #include "utils/gsl.h"
 #include "utils/OsUtils.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
 
 namespace util = org::apache::nifi::minifi::utils;
 namespace mio = org::apache::nifi::minifi::io;
 
 namespace {
-std::string get_last_getaddrinfo_err_str(int getaddrinfo_result) {
-#ifdef WIN32
-  (void)getaddrinfo_result;  // against unused warnings on windows
-  return mio::get_last_socket_error_message();
-#else
-  return gai_strerror(getaddrinfo_result);
-#endif /* WIN32 */
-}
-
 template<typename T, typename Pred, typename Adv>
 auto find_if_custom_linked_list(T* const list, const Adv advance_func, const 
Pred predicate) ->
     typename 
std::enable_if<std::is_convertible<decltype(advance_func(std::declval<T*>())), 
T*>::value && std::is_convertible<decltype(predicate(std::declval<T*>())), 
bool>::value, T*>::type
@@ -105,7 +98,7 @@ std::error_code set_non_blocking(const mio::SocketDescriptor 
fd) noexcept {
   }
 #else
   u_long iMode = 1;
-  if (ioctlsocket(fd, FIONBIO, &iMode) == SOCKET_ERROR) {
+  if (ioctlsocket(fd, FIONBIO, &iMode) == mio::SocketError) {
     return { WSAGetLastError(), std::system_category() };
   }
 #endif /* !WIN32 */
@@ -119,18 +112,10 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-std::string get_last_socket_error_message() {
-#ifdef WIN32
-  const auto error_code = WSAGetLastError();
-#else
-  const auto error_code = errno;
-#endif /* WIN32 */
-  return std::system_category().message(error_code);
-}
 
 bool valid_socket(const SocketDescriptor fd) noexcept {
 #ifdef WIN32
-  return fd != INVALID_SOCKET && fd >= 0;
+  return fd != InvalidSocket && fd >= 0;
 #else
   return fd >= 0;
 #endif /* WIN32 */
@@ -179,7 +164,7 @@ Socket& Socket::operator=(Socket &&other) noexcept {
   port_ = std::exchange(other.port_, 0);
   is_loopback_only_ = std::exchange(other.is_loopback_only_, false);
   local_network_interface_ = std::exchange(other.local_network_interface_, {});
-  socket_file_descriptor_ = std::exchange(other.socket_file_descriptor_, 
INVALID_SOCKET);
+  socket_file_descriptor_ = std::exchange(other.socket_file_descriptor_, 
InvalidSocket);
   total_list_ = other.total_list_;
   FD_ZERO(&other.total_list_);
   read_fds_ = other.read_fds_;
@@ -208,7 +193,7 @@ void Socket::close() {
 #else
     ::close(socket_file_descriptor_);
 #endif
-    socket_file_descriptor_ = INVALID_SOCKET;
+    socket_file_descriptor_ = InvalidSocket;
   }
   if (total_written_ > 0) {
     
local_network_interface_.log_write(gsl::narrow<uint32_t>(total_written_.load()));
@@ -229,7 +214,7 @@ void Socket::setNonBlocking() {
 int8_t Socket::createConnection(const addrinfo* const destination_addresses) {
   for (const auto *current_addr = destination_addresses; current_addr; 
current_addr = current_addr->ai_next) {
     if (!valid_socket(socket_file_descriptor_ = 
socket(current_addr->ai_family, current_addr->ai_socktype, 
current_addr->ai_protocol))) {
-      logger_->log_warn("socket: %s", get_last_socket_error_message());
+      logger_->log_warn("socket: %s", 
utils::net::get_last_socket_error().message());
       continue;
     }
     setSocketOptions(socket_file_descriptor_);
@@ -237,20 +222,20 @@ int8_t Socket::createConnection(const addrinfo* const 
destination_addresses) {
     if (listeners_ > 0) {
       // server socket
       const auto bind_result = bind(socket_file_descriptor_, 
current_addr->ai_addr, current_addr->ai_addrlen);
-      if (bind_result == SOCKET_ERROR) {
-        logger_->log_warn("bind: %s", get_last_socket_error_message());
+      if (bind_result == SocketError) {
+        logger_->log_warn("bind: %s", 
utils::net::get_last_socket_error().message());
         close();
         continue;
       }
 
       const auto listen_result = listen(socket_file_descriptor_, listeners_);
-      if (listen_result == SOCKET_ERROR) {
-        logger_->log_warn("listen: %s", get_last_socket_error_message());
+      if (listen_result == SocketError) {
+        logger_->log_warn("listen: %s", 
utils::net::get_last_socket_error().message());
         close();
         continue;
       }
 
-      logger_->log_info("Listening on %s:%" PRIu16 " with backlog %" PRIu16, 
utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_, listeners_);
+      logger_->log_info("Listening on %s:%" PRIu16 " with backlog %" PRIu16, 
utils::net::sockaddr_ntop(current_addr->ai_addr), port_, listeners_);
     } else {
       // client socket
 #ifndef WIN32
@@ -262,13 +247,13 @@ int8_t Socket::createConnection(const addrinfo* const 
destination_addresses) {
 #endif /* !WIN32 */
 
       const auto connect_result = connect(socket_file_descriptor_, 
current_addr->ai_addr, current_addr->ai_addrlen);
-      if (connect_result == SOCKET_ERROR) {
-        logger_->log_warn("Couldn't connect to %s:%" PRIu16 ": %s", 
utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_, 
get_last_socket_error_message());
+      if (connect_result == SocketError) {
+        logger_->log_warn("Couldn't connect to %s:%" PRIu16 ": %s", 
utils::net::sockaddr_ntop(current_addr->ai_addr), port_, 
utils::net::get_last_socket_error().message());
         close();
         continue;
       }
 
-      logger_->log_info("Connected to %s:%" PRIu16, 
utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_);
+      logger_->log_info("Connected to %s:%" PRIu16, 
utils::net::sockaddr_ntop(current_addr->ai_addr), port_);
     }
 
     FD_SET(socket_file_descriptor_, &total_list_);
@@ -293,8 +278,8 @@ int8_t Socket::createConnection(const addrinfo *, ip4addr 
&addr) {
     sa.sin_family = AF_INET;
     sa.sin_port = htons(port_);
     sa.sin_addr.s_addr = htonl(is_loopback_only_ ? INADDR_LOOPBACK : 
INADDR_ANY);
-    if (bind(socket_file_descriptor_, reinterpret_cast<const sockaddr*>(&sa), 
sizeof(struct sockaddr_in)) == SOCKET_ERROR) {
-      logger_->log_error("Could not bind to socket, reason %s", 
get_last_socket_error_message());
+    if (bind(socket_file_descriptor_, reinterpret_cast<const sockaddr*>(&sa), 
sizeof(struct sockaddr_in)) == SocketError) {
+      logger_->log_error("Could not bind to socket, reason %s", 
utils::net::get_last_socket_error().message());
       return -1;
     }
 
@@ -323,7 +308,7 @@ int8_t Socket::createConnection(const addrinfo *, ip4addr 
&addr) {
 #ifdef WIN32
       sa_loc.sin_addr.s_addr = addr.s_addr;
     }
-    if (connect(socket_file_descriptor_, reinterpret_cast<const 
sockaddr*>(&sa_loc), sizeof(sockaddr_in)) == SOCKET_ERROR) {
+    if (connect(socket_file_descriptor_, reinterpret_cast<const 
sockaddr*>(&sa_loc), sizeof(sockaddr_in)) == SocketError) {
       int err = WSAGetLastError();
       if (err == WSAEADDRNOTAVAIL) {
         logger_->log_error("invalid or unknown IP");
@@ -350,31 +335,20 @@ int8_t Socket::createConnection(const addrinfo *, ip4addr 
&addr) {
 }
 
 int Socket::initialize() {
-  addrinfo hints{};
-  memset(&hints, 0, sizeof hints);  // make sure the struct is empty
-  hints.ai_family = AF_UNSPEC;
-  hints.ai_socktype = SOCK_STREAM;
-  hints.ai_flags = AI_CANONNAME;
-  if (listeners_ > 0 && !is_loopback_only_)
-    hints.ai_flags = AI_PASSIVE;
-  hints.ai_protocol = 0; /* any protocol */
-
-  const char* const gai_node = [this]() -> const char* {
+  const char* const hostname = [this]() -> const char* {
     if (is_loopback_only_) return "localhost";
     if (!is_loopback_only_ && listeners_ > 0) return nullptr;  // all 
non-localhost server sockets listen on wildcard address
     if (!requested_hostname_.empty()) return requested_hostname_.c_str();
     return nullptr;
   }();
-  const auto gai_service = std::to_string(port_);
-  addrinfo* getaddrinfo_result = nullptr;
-  const int errcode = getaddrinfo(gai_node, gai_service.c_str(), &hints, 
&getaddrinfo_result);
-  const std::unique_ptr<addrinfo, util::addrinfo_deleter> addr_info{ 
getaddrinfo_result };
-  getaddrinfo_result = nullptr;
-  if (errcode != 0) {
-    logger_->log_error("getaddrinfo: %s", 
get_last_getaddrinfo_err_str(errcode));
+  const bool is_server = hostname == nullptr;
+  const auto addr_info_or_error = utils::net::resolveHost(hostname, port_, 
utils::net::IpProtocol::Tcp, !is_server);
+  if (!addr_info_or_error) {
+    logger_->log_error("getaddrinfo: %s", 
addr_info_or_error.error().message());
     return -1;
   }
-  socket_file_descriptor_ = INVALID_SOCKET;
+  const auto& addr_info = *addr_info_or_error;
+  socket_file_descriptor_ = InvalidSocket;
 
   // AI_CANONNAME always sets ai_canonname of the first addrinfo structure
   canonical_hostname_ = !IsNullOrEmpty(addr_info->ai_canonname) ? 
addr_info->ai_canonname : requested_hostname_;
@@ -487,7 +461,7 @@ size_t Socket::write(const uint8_t *value, size_t size) {
     // check for errors
     if (send_ret <= 0) {
       utils::file::FileUtils::close(fd);
-      logger_->log_error("Could not send to %d, error: %s", fd, 
get_last_socket_error_message());
+      logger_->log_error("Could not send to %d, error: %s", fd, 
utils::net::get_last_socket_error().message());
       return STREAM_ERROR;
     }
     bytes += gsl::narrow<size_t>(send_ret);
diff --git a/libminifi/src/io/tls/TLSSocket.cpp 
b/libminifi/src/io/tls/TLSSocket.cpp
index af8772a..85e3958 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -292,7 +292,7 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) {
     if (listeners_ > 0) {
       const auto newfd = accept(socket_file_descriptor_, nullptr, nullptr);
       if (!valid_socket(newfd)) {
-        logger_->log_error("accept: %s", get_last_socket_error_message());
+        logger_->log_error("accept: %s", 
utils::net::get_last_socket_error().message());
         return -1;
       }
       FD_SET(newfd, &total_list_);  // add to master set
diff --git a/libminifi/src/utils/NetworkInterfaceInfo.cpp 
b/libminifi/src/utils/NetworkInterfaceInfo.cpp
index 82cf763..3384c6b 100644
--- a/libminifi/src/utils/NetworkInterfaceInfo.cpp
+++ b/libminifi/src/utils/NetworkInterfaceInfo.cpp
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 #include "utils/NetworkInterfaceInfo.h"
-#include "utils/OsUtils.h"
+#include "utils/net/Socket.h"
 #include "core/logging/LoggerConfiguration.h"
 #ifdef WIN32
 #include <Windows.h>
@@ -56,9 +56,9 @@ NetworkInterfaceInfo::NetworkInterfaceInfo(const 
IP_ADAPTER_ADDRESSES* adapter)
   name_ = utf8_encode(adapter->FriendlyName);
   for (auto unicast_address = adapter->FirstUnicastAddress; unicast_address != 
nullptr; unicast_address = unicast_address->Next) {
     if (unicast_address->Address.lpSockaddr->sa_family == AF_INET) {
-      
ip_v4_addresses_.push_back(OsUtils::sockaddr_ntop(unicast_address->Address.lpSockaddr));
+      
ip_v4_addresses_.push_back(net::sockaddr_ntop(unicast_address->Address.lpSockaddr));
     } else if (unicast_address->Address.lpSockaddr->sa_family == AF_INET6) {
-      
ip_v6_addresses_.push_back(OsUtils::sockaddr_ntop(unicast_address->Address.lpSockaddr));
+      
ip_v6_addresses_.push_back(net::sockaddr_ntop(unicast_address->Address.lpSockaddr));
     }
   }
   running_ = adapter->OperStatus == IfOperStatusUp;
@@ -68,9 +68,9 @@ NetworkInterfaceInfo::NetworkInterfaceInfo(const 
IP_ADAPTER_ADDRESSES* adapter)
 NetworkInterfaceInfo::NetworkInterfaceInfo(const struct ifaddrs* ifa) {
   name_ = ifa->ifa_name;
   if (ifa->ifa_addr->sa_family == AF_INET) {
-    ip_v4_addresses_.push_back(OsUtils::sockaddr_ntop(ifa->ifa_addr));
+    ip_v4_addresses_.push_back(net::sockaddr_ntop(ifa->ifa_addr));
   } else if (ifa->ifa_addr->sa_family == AF_INET6) {
-    ip_v6_addresses_.push_back(OsUtils::sockaddr_ntop(ifa->ifa_addr));
+    ip_v6_addresses_.push_back(net::sockaddr_ntop(ifa->ifa_addr));
   }
   running_ = (ifa->ifa_flags & IFF_RUNNING);
   loopback_ = (ifa->ifa_flags & IFF_LOOPBACK);
diff --git a/libminifi/src/utils/OsUtils.cpp b/libminifi/src/utils/OsUtils.cpp
index c4bc58f..cfac877 100644
--- a/libminifi/src/utils/OsUtils.cpp
+++ b/libminifi/src/utils/OsUtils.cpp
@@ -320,40 +320,6 @@ std::string OsUtils::getMachineArchitecture() {
   return "unknown";
 }
 
-namespace {
-std::string get_last_socket_error_message() {
-#ifdef WIN32
-  const auto error_code = WSAGetLastError();
-#else
-  const auto error_code = errno;
-#endif /* WIN32 */
-  return std::system_category().message(error_code);
-}
-}
-
-std::string OsUtils::sockaddr_ntop(const sockaddr* const sa) {
-  std::string result;
-  if (sa->sa_family == AF_INET) {
-    sockaddr_in sa_in{};
-    std::memcpy(reinterpret_cast<void*>(&sa_in), sa, sizeof(sockaddr_in));
-    result.resize(INET_ADDRSTRLEN);
-    if (inet_ntop(AF_INET, &sa_in.sin_addr, &result[0], INET_ADDRSTRLEN) == 
nullptr) {
-      throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, 
get_last_socket_error_message() };
-    }
-  } else if (sa->sa_family == AF_INET6) {
-    sockaddr_in6 sa_in6{};
-    std::memcpy(reinterpret_cast<void*>(&sa_in6), sa, sizeof(sockaddr_in6));
-    result.resize(INET6_ADDRSTRLEN);
-    if (inet_ntop(AF_INET6, &sa_in6.sin6_addr, &result[0], INET6_ADDRSTRLEN) 
== nullptr) {
-      throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, 
get_last_socket_error_message() };
-    }
-  } else {
-    throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, 
"sockaddr_ntop: unknown address family" };
-  }
-  result.resize(strlen(result.c_str()));  // discard remaining null bytes at 
the end
-  return result;
-}
-
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/src/utils/net/DNS.cpp b/libminifi/src/utils/net/DNS.cpp
new file mode 100644
index 0000000..eac09e2
--- /dev/null
+++ b/libminifi/src/utils/net/DNS.cpp
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/net/DNS.h"
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include "utils/net/Socket.h"
+#else
+#include <netdb.h>
+#include <cstring>
+#endif /* WIN32 */
+
+namespace org::apache::nifi::minifi::utils::net {
+
+namespace {
+
+#ifndef WIN32
+class addrinfo_category : public std::error_category {
+ public:
+  [[nodiscard]] const char* name() const noexcept override { return 
"addrinfo"; }
+
+  [[nodiscard]] std::string message(int value) const override {
+    return gai_strerror(value);
+  }
+};
+
+const addrinfo_category& get_addrinfo_category() {
+  static addrinfo_category instance;
+  return instance;
+}
+#endif
+
+std::error_code get_last_getaddrinfo_err_code(int getaddrinfo_result) {
+#ifdef WIN32
+  (void)getaddrinfo_result;  // against unused warnings on windows
+  return std::error_code{WSAGetLastError(), std::system_category()};
+#else
+  return std::error_code{getaddrinfo_result, get_addrinfo_category()};
+#endif /* WIN32 */
+}
+}  // namespace
+
+void addrinfo_deleter::operator()(addrinfo* const p) const noexcept {
+  freeaddrinfo(p);
+}
+
+nonstd::expected<gsl::not_null<std::unique_ptr<addrinfo, addrinfo_deleter>>, 
std::error_code> resolveHost(const char* const hostname, const char* const 
port, const IpProtocol protocol,
+    const bool need_canonname) {
+  addrinfo hints{};
+  memset(&hints, 0, sizeof hints);  // make sure the struct is empty
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = protocol == IpProtocol::Tcp ? SOCK_STREAM : SOCK_DGRAM;
+  hints.ai_flags = need_canonname ? AI_CANONNAME : 0;
+  if (!hostname)
+    hints.ai_flags |= AI_PASSIVE;
+  hints.ai_protocol = [protocol]() -> int {
+    switch (protocol) {
+      case IpProtocol::Tcp: return IPPROTO_TCP;
+      case IpProtocol::Udp: return IPPROTO_UDP;
+    }
+    return 0;
+  }();
+
+  addrinfo* getaddrinfo_result = nullptr;
+  const int errcode = getaddrinfo(hostname, port, &hints, &getaddrinfo_result);
+  auto addr_info = gsl::make_not_null(std::unique_ptr<addrinfo, 
addrinfo_deleter>{getaddrinfo_result});
+  getaddrinfo_result = nullptr;
+  if (errcode != 0) {
+    return nonstd::make_unexpected(get_last_getaddrinfo_err_code(errcode));
+  }
+  return addr_info;
+}
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/Socket.cpp 
b/libminifi/src/utils/net/Socket.cpp
new file mode 100644
index 0000000..533dd94
--- /dev/null
+++ b/libminifi/src/utils/net/Socket.cpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/net/Socket.h"
+#include "Exception.h"
+#include <cstring>
+#include <system_error>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif /* WIN32_LEAN_AND_MEAN */
+#include <ws2tcpip.h>
+#else
+#include <arpa/inet.h>
+#endif /* WIN32 */
+
+namespace org::apache::nifi::minifi::utils::net {
+std::error_code get_last_socket_error() {
+#ifdef WIN32
+  const auto error_code = WSAGetLastError();
+#else
+  const auto error_code = errno;
+#endif /* WIN32 */
+  return {error_code, std::system_category()};
+}
+
+nonstd::expected<OpenSocketResult, std::error_code> open_socket(const 
gsl::not_null<const addrinfo*> getaddrinfo_result) {
+  for (const addrinfo* it = getaddrinfo_result; it; it = it->ai_next) {
+    const auto fd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
+    if (fd != utils::net::InvalidSocket) return 
OpenSocketResult{UniqueSocketHandle{fd}, gsl::make_not_null(it)};
+  }
+  return nonstd::make_unexpected(get_last_socket_error());
+}
+
+std::string sockaddr_ntop(const sockaddr* const sa) {
+  std::string result;
+  if (sa->sa_family == AF_INET) {
+    sockaddr_in sa_in{};
+    std::memcpy(&sa_in, sa, sizeof(sockaddr_in));
+    result.resize(INET_ADDRSTRLEN);
+    if (inet_ntop(AF_INET, &sa_in.sin_addr, &result[0], INET_ADDRSTRLEN) == 
nullptr) {
+      throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, 
get_last_socket_error().message() };
+    }
+  } else if (sa->sa_family == AF_INET6) {
+    sockaddr_in6 sa_in6{};
+    std::memcpy(&sa_in6, sa, sizeof(sockaddr_in6));
+    result.resize(INET6_ADDRSTRLEN);
+    if (inet_ntop(AF_INET6, &sa_in6.sin6_addr, &result[0], INET6_ADDRSTRLEN) 
== nullptr) {
+      throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, 
get_last_socket_error().message() };
+    }
+  } else {
+    throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION, 
"sockaddr_ntop: unknown address family" };
+  }
+  result.resize(strlen(result.c_str()));  // discard remaining null bytes at 
the end
+  return result;
+}
+
+}  // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/test/SingleInputTestController.h 
b/libminifi/test/SingleInputTestController.h
new file mode 100644
index 0000000..16c5354
--- /dev/null
+++ b/libminifi/test/SingleInputTestController.h
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <memory>
+#include <set>
+#include <string>
+#include <string_view>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+#include "TestBase.h"
+#include "core/Processor.h"
+
+namespace org::apache::nifi::minifi::test {
+class SingleInputTestController : public TestController {
+ public:
+  explicit SingleInputTestController(const std::shared_ptr<core::Processor>& 
processor)
+      : processor_{plan->addProcessor(processor, processor->getName())}
+  {}
+
+  std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>>
+  trigger(const std::string_view input_flow_file_content, 
std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
+    const auto new_flow_file = createFlowFile(input_flow_file_content, 
std::move(input_flow_file_attributes));
+    input_->put(new_flow_file);
+    plan->runProcessor(processor_);
+    std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>> result;
+    for (const auto& [relationship, connection]: outgoing_connections_) {
+      std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+      std::vector<std::shared_ptr<core::FlowFile>> output_flow_files;
+      while (connection->isWorkAvailable()) {
+        auto output_flow_file = connection->poll(expired_flow_files);
+        CHECK(expired_flow_files.empty());
+        if (!output_flow_file) continue;
+        output_flow_files.push_back(std::move(output_flow_file));
+      }
+      result.insert_or_assign(relationship, std::move(output_flow_files));
+    }
+    return result;
+  }
+
+  core::Relationship addDynamicRelationship(std::string name) {
+    auto relationship = core::Relationship{std::move(name), ""};
+    outgoing_connections_.insert_or_assign(relationship, 
plan->addConnection(processor_, relationship, nullptr));
+    return relationship;
+  }
+
+ private:
+  std::shared_ptr<core::FlowFile> createFlowFile(const std::string_view 
content, std::unordered_map<std::string, std::string> attributes) {
+    const auto flow_file = std::make_shared<FlowFileRecord>();
+    for (auto& attr : std::move(attributes)) {
+      flow_file->setAttribute(attr.first, std::move(attr.second));
+    }
+    auto content_session = plan->getContentRepo()->createSession();
+    auto claim = content_session->create();
+    auto stream = content_session->write(claim);
+    stream->write(reinterpret_cast<const uint8_t*>(content.data()), 
content.size());
+    flow_file->setResourceClaim(claim);
+    flow_file->setSize(stream->size());
+    flow_file->setOffset(0);
+
+    stream->close();
+    content_session->commit();
+    return flow_file;
+  }
+
+ public:
+  std::shared_ptr<TestPlan> plan = createPlan();
+
+ protected:
+  core::Processor& getProcessor() const { return *processor_; }
+
+ private:
+  std::shared_ptr<core::Processor> processor_;
+  std::unordered_map<core::Relationship, std::shared_ptr<Connection>> 
outgoing_connections_{[this] {
+    std::unordered_map<core::Relationship, std::shared_ptr<Connection>> result;
+    for (const auto& relationship: processor_->getSupportedRelationships()) {
+      result.insert_or_assign(relationship, plan->addConnection(processor_, 
relationship, nullptr));
+    }
+    return result;
+  }()};
+  std::shared_ptr<Connection> input_ = plan->addConnection(nullptr, 
core::Relationship{"success", "success"}, processor_);
+};
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 4dc83b3..324cb37 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -592,8 +592,8 @@ void TestPlan::validateAnnotations() const {
   }
 }
 
-std::string TestPlan::getContent(const 
std::shared_ptr<minifi::core::FlowFile>& file) {
-  auto content_claim = file->getResourceClaim();
+std::string TestPlan::getContent(const minifi::core::FlowFile& file) const {
+  auto content_claim = file.getResourceClaim();
   auto content_stream = content_repo_->read(*content_claim);
   auto output_stream = std::make_shared<minifi::io::BufferStream>();
   minifi::InputStreamPipe{output_stream}.process(content_stream);
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index bbee252..10833b0 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -247,7 +247,8 @@ class TestPlan {
     return state_manager_provider_;
   }
 
-  std::string getContent(const std::shared_ptr<minifi::core::FlowFile>& file);
+  std::string getContent(const std::shared_ptr<const minifi::core::FlowFile>& 
file) const { return getContent(*file); }
+  std::string getContent(const minifi::core::FlowFile& file) const;
 
   void finalize();
 
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h 
b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 581b68e..a506e6b 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -42,18 +42,6 @@ struct WriteStringToFlowFile : public 
minifi::OutputStreamCallback {
   }
 };
 
-struct ReadUntilStreamSize : public minifi::InputStreamCallback {
-  std::string value_;
-
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) 
override {
-    value_.clear();
-    std::vector<uint8_t> buffer;
-    size_t bytes_read = stream->read(buffer, stream->size());
-    value_.assign(buffer.begin(), buffer.end());
-    return minifi::io::isError(bytes_read) ? -1 : 
gsl::narrow<int64_t>(bytes_read);
-  }
-};
-
 struct ReadUntilItCan : public minifi::InputStreamCallback {
   std::string value_;
 
@@ -132,22 +120,21 @@ void 
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
   auto clone_second_half = process_session.clone(original_ff, 3, 3);
   REQUIRE(clone_first_half != nullptr);
   REQUIRE(clone_second_half != nullptr);
-  ReadUntilStreamSize read_until_stream_size_callback;
   ReadUntilItCan read_until_it_can_callback;
-  process_session.read(original_ff, &read_until_stream_size_callback);
+  const auto read_result_original = process_session.readBuffer(original_ff);
   process_session.read(original_ff, &read_until_it_can_callback);
   CHECK(original_ff->getSize() == 6);
-  CHECK(read_until_stream_size_callback.value_ == "foobar");
+  CHECK(to_string(read_result_original) == "foobar");
   CHECK(read_until_it_can_callback.value_ == "foobar");
-  process_session.read(clone_first_half, &read_until_stream_size_callback);
+  const auto read_result_first_half = 
process_session.readBuffer(clone_first_half);
   process_session.read(clone_first_half, &read_until_it_can_callback);
   CHECK(clone_first_half->getSize() == 3);
-  CHECK(read_until_stream_size_callback.value_ == "foo");
+  CHECK(to_string(read_result_first_half) == "foo");
   CHECK(read_until_it_can_callback.value_ == "foo");
-  process_session.read(clone_second_half, &read_until_stream_size_callback);
+  const auto read_result_second_half = 
process_session.readBuffer(clone_second_half);
   process_session.read(clone_second_half, &read_until_it_can_callback);
   CHECK(clone_second_half->getSize() == 3);
-  CHECK(read_until_stream_size_callback.value_ == "bar");
+  CHECK(to_string(read_result_second_half) == "bar");
   CHECK(read_until_it_can_callback.value_ == "bar");
 }
 
@@ -163,11 +150,10 @@ void 
testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> cont
   fixture.transferAndCommit(flow_file);
 
   CHECK(flow_file->getSize() == 8);
-  ReadUntilStreamSize read_until_stream_size_callback;
   ReadUntilItCan read_until_it_can_callback;
-  process_session.read(flow_file, &read_until_stream_size_callback);
+  const auto read_result = process_session.readBuffer(flow_file);
   process_session.read(flow_file, &read_until_it_can_callback);
-  CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+  CHECK(to_string(read_result) == "myfoobar");
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
 
@@ -182,11 +168,10 @@ void 
testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> conten
   fixture.transferAndCommit(flow_file);
 
   CHECK(flow_file->getSize() == 8);
-  ReadUntilStreamSize read_until_stream_size_callback;
+  const auto read_result = process_session.readBuffer(flow_file);
   ReadUntilItCan read_until_it_can_callback;
-  process_session.read(flow_file, &read_until_stream_size_callback);
   process_session.read(flow_file, &read_until_it_can_callback);
-  CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+  CHECK(to_string(read_result) == "myfoobar");
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
 }  // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/ExpectedTest.cpp 
b/libminifi/test/unit/ExpectedTest.cpp
new file mode 100644
index 0000000..c686ef6
--- /dev/null
+++ b/libminifi/test/unit/ExpectedTest.cpp
@@ -0,0 +1,485 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define EXTENSION_LIST ""
+#include <string_view>
+#include "../TestBase.h"
+#include "utils/expected.h"
+#include "utils/gsl.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+// shamelessly copied from 
https://github.com/TartanLlama/expected/blob/master/tests/extensions.cpp 
(License: CC0)
+TEST_CASE("expected map", "[expected][map]") {
+  auto mul2 = [](int a) { return a * 2; };
+  auto ret_void = [](int) {};
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::map(mul2);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::map(mul2);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::map(mul2);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::map(mul2);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::map(mul2);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::map(mul2);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::map(mul2);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::map(mul2);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::map(ret_void);
+    REQUIRE(ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::map(ret_void);
+    REQUIRE(ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::map(ret_void);
+    REQUIRE(ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::map(ret_void);
+    REQUIRE(ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::map(ret_void);
+    REQUIRE(!ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::map(ret_void);
+    REQUIRE(!ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::map(ret_void);
+    REQUIRE(!ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::map(ret_void);
+    REQUIRE(!ret);
+    STATIC_REQUIRE(
+        (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+  }
+
+
+  // mapping functions which return references
+  {
+    nonstd::expected<int, int> e(42);
+    auto ret = e | utils::map([](int& i) -> int& { return i; });
+    REQUIRE(ret);
+    REQUIRE(ret == 42);
+  }
+}
+
+TEST_CASE("expected flatMap", "[expected][flatMap]") {
+  auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); };
+  auto fail = [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 
17); };
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::flatMap(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::flatMap(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::flatMap(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::flatMap(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::flatMap(succeed);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::flatMap(succeed);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::flatMap(succeed);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::flatMap(succeed);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::flatMap(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+}
+
+TEST_CASE("expected orElse", "[expected][orElse]") {
+  // commented out parts depend on 
https://github.com/martinmoene/expected-lite/pull/40 (waiting for release)
+  // using eptr = std::unique_ptr<int>;
+  auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); };
+  // auto succeedptr = [](eptr e) { return nonstd::expected<int, eptr>(21*2);};
+  auto fail =    [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 
17);};
+  // auto efail =   [](eptr e) { *e = 17;return nonstd::expected<int, 
eptr>(nonstd::unexpect, std::move(e));};
+  // auto failptr = [](eptr e) { return nonstd::expected<int, 
eptr>(nonstd::unexpect, std::move(e));};
+  auto failvoid = [](int) {};
+  // auto failvoidptr = [](const eptr&) { /* don't consume */};
+  // auto consumeptr = [](eptr) {};
+  // auto make_u_int = [](int n) { return std::unique_ptr<int>(new int(n));};
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  /*
+  {
+    nonstd::expected<int, eptr> e = 21;
+    auto ret = std::move(e) | utils::orElse(succeedptr);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+   */
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::orElse(fail);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::orElse(fail);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::orElse(fail);
+    REQUIRE(ret);
+    REQUIRE(ret == 21);
+  }
+
+
+  /*
+  {
+    nonstd::expected<int, eptr> e = 21;
+    auto ret = std::move(e) | utils::orElse(efail);
+    REQUIRE(ret);
+    REQUIRE(ret == 21);
+  }
+   */
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::orElse(fail);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  /*
+  {
+    nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
+    auto ret = std::move(e) | utils::orElse(succeedptr);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+   */
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::orElse(succeed);
+    REQUIRE(ret);
+    REQUIRE(*ret == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::orElse(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::orElse(failvoid);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::orElse(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = e | utils::orElse(failvoid);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::orElse(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::orElse(failvoid);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+
+  /*
+  {
+    nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
+    auto ret = std::move(e) | utils::orElse(failvoidptr);
+    REQUIRE(!ret);
+    REQUIRE(*ret.error() == 21);
+  }
+
+  {
+    nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
+    auto ret = std::move(e) | utils::orElse(consumeptr);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == nullptr);
+  }
+   */
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::orElse(fail);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 17);
+  }
+
+  {
+    const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+    auto ret = std::move(e) | utils::orElse(failvoid);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 21);
+  }
+}
+
+TEST_CASE("expected valueOrElse", "[expected][valueOrElse]") {
+  using namespace std::literals;  // NOLINT
+  nonstd::expected<int, std::string> ex{nonstd::unexpect, "hello"};
+  REQUIRE(42 == (ex | utils::valueOrElse([] { return 42; })));
+  REQUIRE_THROWS_AS(ex | utils::valueOrElse([]{ throw 42; }), int);
+  REQUIRE(gsl::narrow<int>("hello"sv.size()) == (ex | 
utils::valueOrElse([](const std::string& err) { return 
gsl::narrow<int>(err.size()); })));
+  REQUIRE_THROWS_AS(ex | utils::valueOrElse([](std::string){ throw 42; }), 
int);
+  REQUIRE_THROWS_AS(ex | utils::valueOrElse([](const std::string&) -> int { 
throw 42; }), int);
+  REQUIRE_THROWS_AS(std::move(ex) | utils::valueOrElse([](std::string&&) -> 
int { throw 42; }), int);
+}
diff --git a/libminifi/test/unit/GeneralUtilsTest.cpp 
b/libminifi/test/unit/GeneralUtilsTest.cpp
index b75e880..de017c0 100644
--- a/libminifi/test/unit/GeneralUtilsTest.cpp
+++ b/libminifi/test/unit/GeneralUtilsTest.cpp
@@ -59,7 +59,20 @@ static_assert(!does_compile<1, 0>::value, "constexpr 
division by zero shouldn't
 
 TEST_CASE("GeneralUtils::dereference", "[dereference]") {
   const int a = 42;
-  const auto* const pa = &a;
+  const auto pa = gsl::make_not_null(&a);
   REQUIRE(42 == utils::dereference(pa));
   REQUIRE(&a == &utils::dereference(pa));
+
+  const auto uniq_a = gsl::make_not_null(std::make_unique<int>(99));
+  REQUIRE(99 == utils::dereference(uniq_a));
+}
+
+TEST_CASE("GeneralUtils::unsafe_dereference", "[unsafe_dereference]") {
+  const int a = 42;
+  const int* const pa = &a;
+  REQUIRE(42 == utils::unsafe_dereference(pa));
+  REQUIRE(&a == &utils::unsafe_dereference(pa));
+
+  const auto uniq_a = std::make_unique<int>(99);
+  REQUIRE(99 == utils::unsafe_dereference(uniq_a));
 }
diff --git a/libminifi/test/unit/NetUtilsTest.cpp 
b/libminifi/test/unit/NetUtilsTest.cpp
new file mode 100644
index 0000000..9e28980
--- /dev/null
+++ b/libminifi/test/unit/NetUtilsTest.cpp
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <functional>
+#include <string>
+#include <type_traits>
+
+#include "../TestBase.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
+#include "utils/StringUtils.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+namespace net = utils::net;
+
+TEST_CASE("net::resolveHost", "[net][dns][utils][resolveHost]") {
+  REQUIRE(net::sockaddr_ntop(net::resolveHost("127.0.0.1", 
"10080").value()->ai_addr) == "127.0.0.1");
+  const auto localhost_address = 
net::sockaddr_ntop(net::resolveHost("localhost", "10080").value()->ai_addr);
+  REQUIRE((utils::StringUtils::startsWith(localhost_address, "127") || 
localhost_address == "::1"));
+}
diff --git a/libminifi/test/unit/OptionalTest.cpp 
b/libminifi/test/unit/OptionalTest.cpp
index 3be8870..d713901 100644
--- a/libminifi/test/unit/OptionalTest.cpp
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -60,3 +60,14 @@ TEST_CASE("optional orElse", "[optional or else]") {
   REQUIRE(!test4);
   REQUIRE_THROWS_AS(std::optional<bool>{} | utils::orElse([]{ throw ex{}; }), 
ex);
 }
+
+TEST_CASE("optional valueOrElse", "[optional][valueOrElse]") {
+  const auto seven = std::make_optional(7) | utils::valueOrElse([]() -> int { 
throw std::exception{}; });
+  const auto test1 = std::make_optional(6) | utils::valueOrElse([] { return 
49; });
+  const auto test2 = std::optional<int>{} | utils::valueOrElse([] { return 
size_t{0}; });
+
+  REQUIRE(7 == seven);
+  REQUIRE(6 == test1);
+  REQUIRE(0 == test2);
+  REQUIRE_THROWS_AS(std::optional<int>{} | utils::valueOrElse([]() -> int { 
throw std::exception{}; }), std::exception);
+}
diff --git a/libminifi/test/unit/ProcessContextTest.cpp 
b/libminifi/test/unit/ProcessContextTest.cpp
new file mode 100644
index 0000000..a9947d7
--- /dev/null
+++ b/libminifi/test/unit/ProcessContextTest.cpp
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include "ProcessContext.h"
+#include "core/FlowFile.h"
+#include "utils/meta/detected.h"
+
+namespace org::apache::nifi::minifi {
+
+template<typename... Args>
+using getProperty_compiles = 
decltype(std::declval<core::ProcessContext>().getProperty(std::declval<Args>()...));
+
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string, 
const core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string, 
const std::shared_ptr<core::FlowFile>&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string, 
core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string, 
std::shared_ptr<core::FlowFile>>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*, 
const core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*, 
const std::shared_ptr<core::FlowFile>&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*, 
core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*, 
std::shared_ptr<core::FlowFile>>);
+
+}  // namespace org::apache::nifi::minifi

Reply via email to