This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new feb5a1b MINIFICPP-1678 - Create PutUDP processor, add expected-lite
feb5a1b is described below
commit feb5a1b183dc82dd83c6ce53c8c20ba819c27863
Author: Marton Szasz <[email protected]>
AuthorDate: Wed Jan 12 13:04:41 2022 +0100
MINIFICPP-1678 - Create PutUDP processor, add expected-lite
Signed-off-by: Adam Debreceni <[email protected]>
This closes #1208
---
CMakeLists.txt | 3 +
LICENSE | 30 ++
NOTICE | 2 +
PROCESSORS.md | 23 +
README.md | 2 +-
cmake/ExpectedLite.cmake | 24 +
.../expression-language/ProcessContextExpr.cpp | 4 +
.../expression-language/ProcessContextExpr.h | 13 +-
.../standard-processors/processors/PutUDP.cpp | 163 +++++++
extensions/standard-processors/processors/PutUDP.h | 54 +++
.../TLSServerSocketSupportedProtocolsTest.cpp | 9 +-
.../standard-processors/tests/unit/PutUDPTests.cpp | 112 +++++
.../tests/unit/TailFileTests.cpp | 2 +-
libminifi/CMakeLists.txt | 4 +-
libminifi/include/core/ConfigurableComponent.h | 20 +-
libminifi/include/core/FlowFile.h | 30 +-
libminifi/include/core/ProcessContext.h | 26 +-
libminifi/include/core/ProcessSession.h | 11 +-
libminifi/include/core/Relationship.h | 8 +-
libminifi/include/io/ClientSocket.h | 34 +-
libminifi/include/utils/Deleters.h | 10 +-
libminifi/include/utils/GeneralUtils.h | 7 +-
libminifi/include/utils/OptionalUtils.h | 53 +--
libminifi/include/utils/OsUtils.h | 1 -
libminifi/include/utils/StringUtils.h | 21 +-
.../utils/detail/MonadicOperationWrappers.h | 56 +++
libminifi/include/utils/expected.h | 174 ++++++++
libminifi/include/utils/gsl.h | 7 +
libminifi/include/utils/meta/detected.h | 52 +++
libminifi/include/utils/net/DNS.h | 48 ++
libminifi/include/utils/net/Socket.h | 108 +++++
libminifi/src/core/FlowFile.cpp | 2 +-
libminifi/src/core/ProcessSession.cpp | 28 ++
libminifi/src/io/ClientSocket.cpp | 78 ++--
libminifi/src/io/tls/TLSSocket.cpp | 2 +-
libminifi/src/utils/NetworkInterfaceInfo.cpp | 10 +-
libminifi/src/utils/OsUtils.cpp | 34 --
libminifi/src/utils/net/DNS.cpp | 94 ++++
libminifi/src/utils/net/Socket.cpp | 72 +++
libminifi/test/SingleInputTestController.h | 99 +++++
libminifi/test/TestBase.cpp | 4 +-
libminifi/test/TestBase.h | 3 +-
.../test/unit/ContentRepositoryDependentTests.h | 35 +-
libminifi/test/unit/ExpectedTest.cpp | 485 +++++++++++++++++++++
libminifi/test/unit/GeneralUtilsTest.cpp | 15 +-
libminifi/test/unit/NetUtilsTest.cpp | 35 ++
libminifi/test/unit/OptionalTest.cpp | 11 +
libminifi/test/unit/ProcessContextTest.cpp | 36 ++
48 files changed, 1895 insertions(+), 259 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a4d924a..7c3f134 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -370,6 +370,9 @@ target_compile_definitions(gsl-lite INTERFACE
${GslDefinitions})
# date
include(Date)
+# expected-lite
+include(ExpectedLite)
+
# Update passthrough args used in configurations in patch commands
if (WIN32)
set(PASSTHROUGH_CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /w")
diff --git a/LICENSE b/LICENSE
index 5bcf14d..58467d6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -3065,3 +3065,33 @@ SGI C++ Standard Template Library license
//
--------------------------------------------------------------------------
+
+This project bundles 'expected-lite' under the Boost Software License 1.0
+
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
+---------------------------------------------------------------------------
+This project reuses test code from TartanLlama/expected from the Public Domain
or under CC0
+https://creativecommons.org/publicdomain/zero/1.0/
diff --git a/NOTICE b/NOTICE
index b8cf4a2..4715f25 100644
--- a/NOTICE
+++ b/NOTICE
@@ -58,6 +58,8 @@ This software includes third party software subject to the
following copyrights:
- IANA timezone database - public domain
- date (HowardHinnant/date) - notices below
- range-v3 - Eric Niebler and other contributors
+- expected-lite - Copyright (C) 2016-2020 Martin Moene.
+- TartanLlama/expected - public domain, thanks to Sy Brand
The licenses for these third party components are included in LICENSE.txt
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 2445d48..5b393a0 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -50,6 +50,7 @@
- [PutS3Object](#puts3object)
- [PutSFTP](#putsftp)
- [PutSQL](#putsql)
+- [PutUDP](#putudp)
- [QueryDatabaseTable](#querydatabasetable)
- [ReplaceText](#replacetext)
- [RetryFlowFile](#retryflowfile)
@@ -1493,6 +1494,28 @@ In the list below, the names of required properties
appear in bold. Any other pr
| - | - |
|success|After a successful SQL update operation, the incoming FlowFile sent
here|
+## PutUDP
+
+### Description
+
+The PutUDP processor receives a FlowFile and packages the FlowFile content
into a single UDP datagram packet which is then transmitted to the configured
UDP server.
+The processor doesn't guarantee a successful transfer, even if the flow file
is routed to the success relationship.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description
|
+|----------|---------------|------------------|-------------------------------------------------------------------------------------------|
+| **Host** | localhost | | The ip address or hostname of
the destination.<br/>**Supports Expression Language: true** |
+| **Port** | | | The port on the
destination.<br/>**Supports Expression Language: true** |
+
+### Relationships
+
+| Name | Description
|
+|---------|-------------------------------------------------------------------------------|
+| success | FlowFiles that are sent to the destination are sent out this
relationship. |
+| failure | FlowFiles that encounter IO or network errors are send out this
relationship. |
+
## QueryDatabaseTable
### Description
diff --git a/README.md b/README.md
index 57a84ae..934478b 100644
--- a/README.md
+++ b/README.md
@@ -66,7 +66,7 @@ The following table lists the base set of processors.
| Extension Set | Processors |
| ------------- |:-------------|
-| **Base** |
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>
[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P
[...]
+| **Base** |
[AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>
[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](PROCESSORS.md#logattribute)<br/>[PutFile](P
[...]
The next table outlines CMAKE flags that correspond with MiNiFi extensions.
Extensions that are enabled by default ( such as CURL ), can be disabled with
the respective CMAKE flag on the command line.
diff --git a/cmake/ExpectedLite.cmake b/cmake/ExpectedLite.cmake
new file mode 100644
index 0000000..c89551c
--- /dev/null
+++ b/cmake/ExpectedLite.cmake
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include(FetchContent)
+
+FetchContent_Declare(expected-lite
+ URL
https://github.com/martinmoene/expected-lite/archive/refs/tags/v0.5.0.tar.gz
+ URL_HASH
SHA256=80f8c91d228cdc5cac3698141c0321d51dcdb0239c2fdcdeae7d46a9a58f2297
+)
+FetchContent_MakeAvailable(expected-lite)
diff --git a/extensions/expression-language/ProcessContextExpr.cpp
b/extensions/expression-language/ProcessContextExpr.cpp
index 08e515b..1f6ec93 100644
--- a/extensions/expression-language/ProcessContextExpr.cpp
+++ b/extensions/expression-language/ProcessContextExpr.cpp
@@ -37,10 +37,12 @@ bool ProcessContextExpr::getProperty(const Property
&property, std::string &valu
}
logger_->log_debug("Compiling expression for %s/%s: %s",
getProcessorNode()->getName(), name, expression_str);
expressions_.emplace(name, expression::compile(expression_str));
+ expression_strs_.insert_or_assign(name, expression_str);
}
minifi::expression::Parameters p(shared_from_this(), flow_file);
value = expressions_[name](p).asString();
+ logger_->log_debug(R"(expression "%s" of property "%s" evaluated to: %s)",
expression_strs_[name], name, value);
return true;
}
@@ -54,9 +56,11 @@ bool ProcessContextExpr::getDynamicProperty(const Property
&property, std::strin
ProcessContext::getDynamicProperty(name, expression_str);
logger_->log_debug("Compiling expression for %s/%s: %s",
getProcessorNode()->getName(), name, expression_str);
dynamic_property_expressions_.emplace(name,
expression::compile(expression_str));
+ expression_strs_.insert_or_assign(name, expression_str);
}
minifi::expression::Parameters p(shared_from_this(), flow_file);
value = dynamic_property_expressions_[name](p).asString();
+ logger_->log_debug(R"(expression "%s" of dynamic property "%s" evaluated to:
%s)", expression_strs_[name], name, value);
return true;
}
diff --git a/extensions/expression-language/ProcessContextExpr.h
b/extensions/expression-language/ProcessContextExpr.h
index 192433d..31f48ef 100644
--- a/extensions/expression-language/ProcessContextExpr.h
+++ b/extensions/expression-language/ProcessContextExpr.h
@@ -19,7 +19,7 @@
#include <ProcessContext.h>
#include <memory>
-#include <map>
+#include <unordered_map>
#include <string>
#include "impl/expression/Expression.h"
@@ -35,7 +35,7 @@ namespace core {
* state. With this case, we can rely on instantiation of a builder to create
the necessary
* ProcessContext. *
*/
-class ProcessContextExpr : public core::ProcessContext {
+class ProcessContextExpr final : public core::ProcessContext {
public:
/**
std::forward of argument list did not work on all platform.
@@ -54,7 +54,7 @@ class ProcessContextExpr : public core::ProcessContext {
logger_(logging::LoggerFactory<ProcessContextExpr>::getLogger()) {
}
// Destructor
- virtual ~ProcessContextExpr() = default;
+ ~ProcessContextExpr() override = default;
/**
* Retrieves property using EL
* @param property property
@@ -65,11 +65,10 @@ class ProcessContextExpr : public core::ProcessContext {
bool getDynamicProperty(const Property &property, std::string &value, const
std::shared_ptr<FlowFile> &flow_file) override;
- protected:
- std::map<std::string, org::apache::nifi::minifi::expression::Expression>
expressions_;
- std::map<std::string, org::apache::nifi::minifi::expression::Expression>
dynamic_property_expressions_;
-
private:
+ std::unordered_map<std::string,
org::apache::nifi::minifi::expression::Expression> expressions_;
+ std::unordered_map<std::string,
org::apache::nifi::minifi::expression::Expression>
dynamic_property_expressions_;
+ std::unordered_map<std::string, std::string> expression_strs_;
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/standard-processors/processors/PutUDP.cpp
b/extensions/standard-processors/processors/PutUDP.cpp
new file mode 100644
index 0000000..b6f0e3a
--- /dev/null
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutUDP.h"
+
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <winsock2.h>
+#else
+#include <netdb.h>
+#endif /* WIN32 */
+#include <tuple>
+#include <utility>
+
+#include "range/v3/view/join.hpp"
+#include "range/v3/range/conversion.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "utils/net/DNS.h"
+#include "utils/StringUtils.h"
+#include "utils/net/Socket.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutUDP::Hostname =
core::PropertyBuilder::createProperty("Hostname")
+ ->withDescription("The ip address or hostname of the destination.")
+ ->withDefaultValue("localhost")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutUDP::Port =
core::PropertyBuilder::createProperty("Port")
+ ->withDescription("The port on the destination. Can be a service name like
ssh or http, as defined in /etc/services.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Relationship PutUDP::Success{"success", "FlowFiles that are sent
to the destination are sent out this relationship."};
+const core::Relationship PutUDP::Failure{"failure", "FlowFiles that
encountered IO errors are send out this relationship."};
+
+PutUDP::PutUDP(const std::string& name, const utils::Identifier& uuid)
+ :Processor(name, uuid),
logger_{core::logging::LoggerFactory<PutUDP>::getLogger()}
+{ }
+
+PutUDP::~PutUDP() = default;
+
+void PutUDP::initialize() {
+ setSupportedProperties({
+ Hostname,
+ Port
+ });
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+}
+
+void PutUDP::notifyStop() {}
+
+void PutUDP::onSchedule(core::ProcessContext* const context,
core::ProcessSessionFactory*) {
+ gsl_Expects(context);
+
+ // if the required properties are missing or empty even before evaluating
the EL expression, then we can throw in onSchedule, before we waste any flow
files
+ if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+ }
+ if (context->getProperty(Port).value_or(std::string{}).empty()) {
+ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+ }
+}
+
+void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession*
const session) {
+ gsl_Expects(context && session);
+
+ const auto flow_file = session->get();
+ if (!flow_file) {
+ yield();
+ return;
+ }
+
+ const auto hostname = context->getProperty(Hostname,
flow_file).value_or(std::string{});
+ const auto port = context->getProperty(Port,
flow_file).value_or(std::string{});
+ if (hostname.empty() || port.empty()) {
+ logger_->log_error("[%s] invalid target endpoint: hostname: %s, port: %s",
flow_file->getUUIDStr(),
+ hostname.empty() ? "(empty)" : hostname.c_str(),
+ port.empty() ? "(empty)" : port.c_str());
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ const auto data = session->readBuffer(flow_file);
+ if (data.status < 0) {
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ const auto nonthrowing_sockaddr_ntop = [](const sockaddr* const sa) ->
std::string {
+ return utils::try_expression([sa] { return utils::net::sockaddr_ntop(sa);
}).value_or("(n/a)");
+ };
+
+ const auto debug_log_resolved_names = [&, this](const addrinfo& names) ->
decltype(auto) {
+ if (logger_->should_log(core::logging::LOG_LEVEL::debug)) {
+ std::vector<std::string> names_vector;
+ for (const addrinfo* it = &names; it; it = it->ai_next) {
+ names_vector.push_back(nonthrowing_sockaddr_ntop(it->ai_addr));
+ }
+ logger_->log_debug("resolved \'%s\' to: %s",
+ hostname,
+ names_vector | ranges::views::join(',') | ranges::to<std::string>());
+ }
+ return names;
+ };
+
+ utils::net::resolveHost(hostname.c_str(), port.c_str(),
utils::net::IpProtocol::Udp)
+ | utils::map(utils::dereference)
+ | utils::map(debug_log_resolved_names)
+ | utils::flatMap([](const auto& names) { return
utils::net::open_socket(names); })
+ | utils::flatMap([&, this](utils::net::OpenSocketResult
socket_handle_and_selected_name) -> nonstd::expected<void, std::error_code> {
+ const auto& [socket_handle, selected_name] =
socket_handle_and_selected_name;
+ logger_->log_debug("connected to %s",
nonthrowing_sockaddr_ntop(selected_name->ai_addr));
+#ifdef WIN32
+ const char* const buffer_ptr = reinterpret_cast<const
char*>(data.buffer.data());
+#else
+ const void* const buffer_ptr = data.buffer.data();
+#endif
+ const auto send_result = ::sendto(socket_handle.get(), buffer_ptr,
data.buffer.size(), 0, selected_name->ai_addr, selected_name->ai_addrlen);
+ logger_->log_trace("sendto returned %ld",
static_cast<long>(send_result)); // NOLINT: sendto
+ if (send_result == utils::net::SocketError) {
+ return nonstd::make_unexpected(utils::net::get_last_socket_error());
+ }
+ session->transfer(flow_file, Success);
+ return {};
+ })
+ | utils::orElse([&, this](std::error_code ec) {
+ gsl_Expects(ec);
+ logger_->log_error("%s", ec.message());
+ session->transfer(flow_file, Failure);
+ });
+}
+
+REGISTER_RESOURCE(PutUDP, "The PutUDP processor receives a FlowFile and
packages the FlowFile content into a single UDP datagram packet which is then
transmitted to the configured UDP server. "
+ "The processor doesn't guarantee a successful
transfer, even if the flow file is routed to the success relationship.");
+
+} // namespace org::apache::nifi::minifi::processors
+
diff --git a/extensions/standard-processors/processors/PutUDP.h
b/extensions/standard-processors/processors/PutUDP.h
new file mode 100644
index 0000000..c8ba6e9
--- /dev/null
+++ b/extensions/standard-processors/processors/PutUDP.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "Processor.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::core::logging { class Logger; }
+
+namespace org::apache::nifi::minifi::processors {
+class PutUDP final : public core::Processor {
+ public:
+ EXTENSIONAPI static const core::Property Hostname;
+ EXTENSIONAPI static const core::Property Port;
+
+ EXTENSIONAPI static const core::Relationship Success;
+ EXTENSIONAPI static const core::Relationship Failure;
+
+ explicit PutUDP(const std::string& name, const utils::Identifier& uuid = {});
+ PutUDP(const PutUDP&) = delete;
+ PutUDP& operator=(const PutUDP&) = delete;
+ ~PutUDP() final;
+
+ void initialize() final;
+ void notifyStop() final;
+ void onSchedule(core::ProcessContext*, core::ProcessSessionFactory *) final;
+ void onTrigger(core::ProcessContext*, core::ProcessSession*) final;
+
+ core::annotation::Input getInputRequirement() const noexcept final { return
core::annotation::Input::INPUT_REQUIRED; }
+ bool isSingleThreaded() const noexcept final { return true; /* for now */ }
+ private:
+ std::string hostname_;
+ std::string port_;
+ std::shared_ptr<core::logging::Logger> logger_;
+};
+} // namespace org::apache::nifi::minifi::processors
diff --git
a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
index 2ad3d01..b52a4e9 100644
---
a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
+++
b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp
@@ -37,6 +37,7 @@
#include "properties/Configure.h"
#include "io/tls/TLSSocket.h"
#include "io/tls/TLSServerSocket.h"
+#include "utils/net/Socket.h"
namespace minifi = org::apache::nifi::minifi;
@@ -86,7 +87,7 @@ std::string str_addr(const sockaddr* const sa) {
const auto addr_str = inet_ntop(AF_INET, &sin.sin_addr, buf,
sizeof(buf));
#endif
if (!addr_str) {
- throw std::runtime_error{minifi::io::get_last_socket_error_message()};
+ throw
std::runtime_error{minifi::utils::net::get_last_socket_error().message()};
}
return std::string{addr_str};
}
@@ -99,7 +100,7 @@ std::string str_addr(const sockaddr* const sa) {
const auto addr_str = inet_ntop(AF_INET, &sin6.sin6_addr, buf,
sizeof(buf));
#endif
if (!addr_str) {
- throw std::runtime_error{minifi::io::get_last_socket_error_message()};
+ throw
std::runtime_error{minifi::utils::net::get_last_socket_error().message()};
}
return std::string{addr_str};
}
@@ -167,14 +168,14 @@ class SimpleSSLTestClient {
log_addrinfo(addr, logger);
sfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (sfd == INVALID_SOCKET) {
- logger.log_error("socket: %s\n",
minifi::io::get_last_socket_error_message());
+ logger.log_error("socket: %s\n",
minifi::utils::net::get_last_socket_error().message());
continue;
}
const auto connect_result = connect(sfd, addr->ai_addr,
addr->ai_addrlen);
if (connect_result == 0) {
break;
} else {
- logger.log_error("connect to %s: %s\n", str_addr(addr->ai_addr),
minifi::io::get_last_socket_error_message());
+ logger.log_error("connect to %s: %s\n", str_addr(addr->ai_addr),
minifi::utils::net::get_last_socket_error().message());
}
sfd = INVALID_SOCKET;
#ifdef WIN32
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
new file mode 100644
index 0000000..787ebe0
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <new>
+#include <random>
+#include <string>
+#include "SingleInputTestController.h"
+#include "PutUDP.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
+#include "utils/expected.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+namespace {
+struct DatagramListener {
+ DatagramListener(const char* const hostname, const char* const port)
+ :resolved_names_{utils::net::resolveHost(hostname, port,
utils::net::IpProtocol::Udp).value()},
+ open_socket_{utils::net::open_socket(*resolved_names_)
+ | utils::valueOrElse([=]() -> utils::net::OpenSocketResult { throw
std::runtime_error{utils::StringUtils::join_pack("Failed to connect to ",
hostname, " on port ", port)}; })}
+ {
+ const auto bind_result = bind(open_socket_.socket_.get(),
open_socket_.selected_name->ai_addr, open_socket_.selected_name->ai_addrlen);
+ if (bind_result == utils::net::SocketError) {
+ throw std::runtime_error{utils::StringUtils::join_pack("bind: ",
utils::net::get_last_socket_error().message())};
+ }
+ }
+
+ struct ReceiveResult {
+ std::string remote_address;
+ std::string message;
+ };
+
+ [[nodiscard]] ReceiveResult receive(const size_t max_message_size = 8192)
const {
+ ReceiveResult result;
+ result.message.resize(max_message_size);
+ sockaddr_storage remote_address{};
+ socklen_t addrlen = sizeof(remote_address);
+ const auto recv_result = recvfrom(open_socket_.socket_.get(),
result.message.data(), result.message.size(), 0,
std::launder(reinterpret_cast<sockaddr*>(&remote_address)), &addrlen);
+ if (recv_result == utils::net::SocketError) {
+ throw std::runtime_error{utils::StringUtils::join_pack("recvfrom: ",
utils::net::get_last_socket_error().message())};
+ }
+ result.message.resize(gsl::narrow<size_t>(recv_result));
+ result.remote_address =
utils::net::sockaddr_ntop(std::launder(reinterpret_cast<sockaddr*>(&remote_address)));
+ return result;
+ }
+
+ std::unique_ptr<addrinfo, utils::net::addrinfo_deleter> resolved_names_;
+ utils::net::OpenSocketResult open_socket_;
+};
+} // namespace
+
+// Testing the failure relationship is not required, because since UDP in
general without guarantees, flow files are always routed to success, unless
there is
+// some weird IO error with the content repo.
+TEST_CASE("PutUDP", "[putudp]") {
+ const auto putudp = std::make_shared<PutUDP>("PutUDP");
+ auto random_engine = std::mt19937{std::random_device{}()}; // NOLINT:
"Missing space before { [whitespace/braces] [5]"
+ // most systems use ports 32768 - 65535 as ephemeral ports, so avoid binding
to those
+ const auto port = std::uniform_int_distribution<uint16_t>{10000, 32768 -
1}(random_engine);
+ const auto port_str = std::to_string(port);
+
+ test::SingleInputTestController controller{putudp};
+ LogTestController::getInstance().setTrace<PutUDP>();
+ LogTestController::getInstance().setTrace<core::ProcessContext>();
+ LogTestController::getInstance().setLevelByClassName(spdlog::level::trace,
"org::apache::nifi::minifi::core::ProcessContextExpr");
+ putudp->setProperty(PutUDP::Hostname, "${literal('localhost')}");
+ putudp->setProperty(PutUDP::Port,
utils::StringUtils::join_pack("${literal('", port_str, "')}"));
+
+ DatagramListener listener{"localhost", port_str.c_str()};
+
+ {
+ const char* const message = "first message: hello";
+ const auto result = controller.trigger(message);
+ const auto& success_flow_files = result.at(PutUDP::Success);
+ REQUIRE(success_flow_files.size() == 1);
+ REQUIRE(result.at(PutUDP::Failure).empty());
+ REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
+ auto receive_result = listener.receive();
+ REQUIRE(receive_result.message == message);
+ REQUIRE(!receive_result.remote_address.empty());
+ }
+
+ {
+ const char* const message = "longer message
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
[...]
+ const auto result = controller.trigger(message);
+ const auto& success_flow_files = result.at(PutUDP::Success);
+ REQUIRE(success_flow_files.size() == 1);
+ REQUIRE(result.at(PutUDP::Failure).empty());
+ REQUIRE(controller.plan->getContent(success_flow_files[0]) == message);
+ auto receive_result = listener.receive();
+ REQUIRE(receive_result.message == message);
+ REQUIRE(!receive_result.remote_address.empty());
+ }
+}
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 8e7eb08..ab99255 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -586,7 +586,7 @@ TEST_CASE("TailFile can handle input files getting
removed", "[multiple_file]")
TEST_CASE("TailFile processes a very long line correctly", "[simple]") {
std::string line1("012\n");
std::string line2(8050, 0);
- std::mt19937 gen(std::random_device{}()); // NOLINT (linter wants a space
before '{')
+ std::mt19937 gen(std::random_device{}()); // NOLINT (linter wants a space
before '{') [whitespace/braces]
std::generate_n(line2.begin(), line2.size() - 1, [&]() -> char {
// Make sure to only generate from characters that don't intersect with
line1 and 3-4
// Starting generation from 64 ensures that no numeric digit characters
are added
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 6f96f96..d10b884 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
endif()
-file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/properties/*.cpp"
"src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp"
"src/core/logging/internal/*.cpp" "src/core/state/*.cpp"
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp"
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES}
"src/core/controller/*.cpp" "src/controllers/*.cpp"
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp"
"src/core/yaml/*.cpp" " [...]
+file(GLOB SOURCES "src/agent/agent_docs.cpp" "src/properties/*.cpp"
"src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp"
"src/core/logging/internal/*.cpp" "src/core/state/*.cpp"
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp"
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES}
"src/core/controller/*.cpp" "src/controllers/*.cpp"
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp"
"src/core/yaml/*.cpp" " [...]
# manually add this as it might not yet be present when this executes
list(APPEND SOURCES "src/agent/agent_version.cpp")
@@ -98,7 +98,7 @@ if(NOT EXCLUDE_BOOST)
endif()
include(RangeV3)
-list(APPEND LIBMINIFI_LIBRARIES yaml-cpp ZLIB::ZLIB concurrentqueue RapidJSON
spdlog cron Threads::Threads gsl-lite libsodium range-v3)
+list(APPEND LIBMINIFI_LIBRARIES yaml-cpp ZLIB::ZLIB concurrentqueue RapidJSON
spdlog cron Threads::Threads gsl-lite libsodium range-v3 expected-lite)
if(NOT WIN32)
list(APPEND LIBMINIFI_LIBRARIES OSSP::libuuid++)
endif()
diff --git a/libminifi/include/core/ConfigurableComponent.h
b/libminifi/include/core/ConfigurableComponent.h
index 86d588a..250ea4f 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -217,21 +217,21 @@ template<typename T>
bool ConfigurableComponent::getProperty(const std::string name, T &value)
const {
std::lock_guard<std::mutex> lock(configuration_mutex_);
- auto &&it = properties_.find(name);
- if (it != properties_.end()) {
- const Property& item = it->second;
- if (item.getValue().getValue() == nullptr) {
+ const auto property_name_and_object = properties_.find(name);
+ if (property_name_and_object != properties_.end()) {
+ const Property& property = property_name_and_object->second;
+ if (property.getValue().getValue() == nullptr) {
// empty value
- if (item.getRequired()) {
- logger_->log_error("Component %s required property %s is empty", name,
item.getName());
- throw utils::internal::RequiredPropertyMissingException("Required
property is empty: " + item.getName());
+ if (property.getRequired()) {
+ logger_->log_error("Component %s required property %s is empty", name,
property.getName());
+ throw utils::internal::RequiredPropertyMissingException("Required
property is empty: " + property.getName());
}
- logger_->log_debug("Component %s property name %s, empty value", name,
item.getName());
+ logger_->log_debug("Component %s property name %s, empty value", name,
property.getName());
return false;
}
- logger_->log_debug("Component %s property name %s value %s", name,
item.getName(), item.getValue().to_string());
+ logger_->log_debug("Component %s property name %s value %s", name,
property.getName(), property.getValue().to_string());
// cast throws if the value is invalid
- value = static_cast<T>(item.getValue());
+ value = static_cast<T>(property.getValue());
return true;
} else {
logger_->log_warn("Could not find property %s", name);
diff --git a/libminifi/include/core/FlowFile.h
b/libminifi/include/core/FlowFile.h
index 15a3b94..6a794f9 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -53,7 +53,7 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* Returns a pointer to this flow file record's
* claim
*/
- std::shared_ptr<ResourceClaim> getResourceClaim();
+ [[nodiscard]] std::shared_ptr<ResourceClaim> getResourceClaim() const;
/**
* Sets _claim to the inbound claim argument
*/
@@ -95,7 +95,7 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* is marked as deleted.
* @return marked deleted
*/
- bool isDeleted() const;
+ [[nodiscard]] bool isDeleted() const;
/**
* Sets whether to mark this flow file record
@@ -108,24 +108,24 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* Get entry date for this record
* @return entry date uint64_t
*/
- uint64_t getEntryDate() const;
+ [[nodiscard]] uint64_t getEntryDate() const;
/**
* Gets the event time.
* @return event time.
*/
- uint64_t getEventTime() const;
+ [[nodiscard]] uint64_t getEventTime() const;
/**
* Get lineage start date
* @return lineage start date uint64_t
*/
- uint64_t getlineageStartDate() const;
+ [[nodiscard]] uint64_t getlineageStartDate() const;
/**
* Sets the lineage start date
* @param date new lineage start date
*/
- void setLineageStartDate(const uint64_t date);
+ void setLineageStartDate(uint64_t date);
void setLineageIdentifiers(const std::vector<utils::Identifier>&
lineage_Identifiers) {
lineage_Identifiers_ = lineage_Identifiers;
@@ -139,7 +139,7 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
*/
bool getAttribute(const std::string& key, std::string& value) const;
- std::optional<std::string> getAttribute(const std::string& key) const;
+ [[nodiscard]] std::optional<std::string> getAttribute(const std::string&
key) const;
/**
* Updates the value in the attribute map that corresponds
@@ -171,7 +171,7 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* Returns the map of attributes
* @return attributes.
*/
- std::map<std::string, std::string> getAttributes() const {
+ [[nodiscard]] std::map<std::string, std::string> getAttributes() const {
return {attributes_.begin(), attributes_.end()};
}
@@ -200,7 +200,7 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* Returns the size of corresponding flow file
* @return size as a uint64_t
*/
- uint64_t getSize() const;
+ [[nodiscard]] uint64_t getSize() const;
/**
* Sets the offset
@@ -215,7 +215,7 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
to_be_processed_after_ = std::chrono::steady_clock::now() + duration;
}
- std::chrono::time_point<std::chrono::steady_clock> getPenaltyExpiration()
const {
+ [[nodiscard]] std::chrono::time_point<std::chrono::steady_clock>
getPenaltyExpiration() const {
return to_be_processed_after_;
}
@@ -223,13 +223,13 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* Gets the offset within the flow file
* @return size as a uint64_t
*/
- uint64_t getOffset() const;
+ [[nodiscard]] uint64_t getOffset() const;
- bool isPenalized() const {
+ [[nodiscard]] bool isPenalized() const {
return to_be_processed_after_ > std::chrono::steady_clock::now();
}
- uint64_t getId() const {
+ [[nodiscard]] uint64_t getId() const {
return id_;
}
@@ -242,13 +242,13 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* Returns the original connection referenced by this record.
* @return shared original connection pointer.
*/
- std::shared_ptr<core::Connectable> getConnection() const;
+ [[nodiscard]] std::shared_ptr<core::Connectable> getConnection() const;
void setStoredToRepository(bool storedInRepository) {
stored = storedInRepository;
}
- bool isStored() const {
+ [[nodiscard]] bool isStored() const {
return stored;
}
diff --git a/libminifi/include/core/ProcessContext.h
b/libminifi/include/core/ProcessContext.h
index 8ce37f6..271f70c 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -113,10 +113,14 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
}
template<typename T>
- bool getProperty(const std::string &name, T &value) const {
+ std::enable_if_t<!std::is_convertible_v<T&, const FlowFile&> &&
!std::is_convertible_v<T&, const std::shared_ptr<FlowFile>&>,
+ bool> getProperty(const std::string &name, T &value) const {
return getPropertyImp<typename std::common_type<T>::type>(name, value);
}
+ template<typename T = std::string>
+ std::enable_if_t<std::is_default_constructible_v<T>, std::optional<T>>
getProperty(const Property&, const std::shared_ptr<FlowFile>&);
+
virtual bool getProperty(const Property &property, std::string &value, const
std::shared_ptr<FlowFile>& /*flow_file*/) {
return getProperty(property.getName(), value);
}
@@ -394,6 +398,26 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
bool initialized_;
};
+template<typename T>
+inline std::enable_if_t<std::is_default_constructible_v<T>, std::optional<T>>
ProcessContext::getProperty(const Property& property, const
std::shared_ptr<FlowFile>& flow_file) {
+ T value;
+ std::string string_value;
+ if (!getProperty(property, string_value, flow_file)) return std::nullopt;
+ try {
+ if (!state::response::Value{string_value}.template convertValue(value))
return std::nullopt;
+ } catch (const utils::internal::ValueException&) {
+ return std::nullopt;
+ }
+ return value;
+}
+
+template<>
+inline std::optional<std::string>
ProcessContext::getProperty<std::string>(const Property& property, const
std::shared_ptr<FlowFile>& flow_file) {
+ std::string value;
+ if (!getProperty(property, value, flow_file)) return std::nullopt;
+ return value;
+}
+
} // namespace core
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/include/core/ProcessSession.h
b/libminifi/include/core/ProcessSession.h
index af54e95..63391f8 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -44,6 +44,14 @@ namespace apache {
namespace nifi {
namespace minifi {
namespace core {
+namespace detail {
+struct ReadBufferResult {
+ int64_t status;
+ std::vector<std::byte> buffer;
+};
+
+std::string to_string(const ReadBufferResult& read_buffer_result);
+} // namespace detail
// ProcessSession Class
class ProcessSession : public ReferenceContainer {
@@ -88,10 +96,11 @@ class ProcessSession : public ReferenceContainer {
void remove(const std::shared_ptr<core::FlowFile> &flow);
// Execute the given read callback against the content
int64_t read(const std::shared_ptr<core::FlowFile> &flow,
InputStreamCallback *callback);
-
int64_t read(const std::shared_ptr<core::FlowFile> &flow,
InputStreamCallback&& callback) {
return read(flow, &callback);
}
+ // Read content into buffer
+ detail::ReadBufferResult readBuffer(const std::shared_ptr<core::FlowFile>&
flow);
// Execute the given write callback against the content
void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback
*callback);
diff --git a/libminifi/include/core/Relationship.h
b/libminifi/include/core/Relationship.h
index bda280e..39aba57 100644
--- a/libminifi/include/core/Relationship.h
+++ b/libminifi/include/core/Relationship.h
@@ -62,7 +62,13 @@ class Relationship {
std::string name_ = "undefined";
std::string description_;
};
-
} // namespace org::apache::nifi::minifi::core
+template<>
+struct std::hash<org::apache::nifi::minifi::core::Relationship> {
+ size_t operator()(const org::apache::nifi::minifi::core::Relationship&
relationship) const noexcept {
+ return std::hash<std::string>{}(relationship.getName());
+ }
+};
+
#endif // LIBMINIFI_INCLUDE_CORE_RELATIONSHIP_H_
diff --git a/libminifi/include/io/ClientSocket.h
b/libminifi/include/io/ClientSocket.h
index 85ea6b9..966dc71 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -20,17 +20,6 @@
#include <utility>
#include <cstdint>
-#ifdef WIN32
-#ifndef WIN32_LEAN_AND_MEAN
-#define WIN32_LEAN_AND_MEAN
-#endif /* WIN32_LEAN_AND_MEAN */
-#include <WinSock2.h>
-#else
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <unistd.h>
-#endif /* WIN32 */
#include <mutex>
#include <atomic>
#include <string>
@@ -43,6 +32,7 @@
#include "io/validation.h"
#include "properties/Configure.h"
#include "io/NetworkPrioritizer.h"
+#include "utils/net/Socket.h"
namespace org {
namespace apache {
@@ -50,22 +40,10 @@ namespace nifi {
namespace minifi {
namespace io {
-#ifdef WIN32
-using SocketDescriptor = SOCKET;
-using ip4addr = in_addr;
-#else
-using SocketDescriptor = int;
-using ip4addr = in_addr_t;
-#undef INVALID_SOCKET
-static constexpr SocketDescriptor INVALID_SOCKET = -1;
-#undef SOCKET_ERROR
-static constexpr int SOCKET_ERROR = -1;
-#endif /* WIN32 */
-
-/**
- * Return the last socket error message, based on errno on posix and
WSAGetLastError() on windows
- */
-std::string get_last_socket_error_message();
+using utils::net::SocketDescriptor;
+using utils::net::ip4addr;
+using utils::net::InvalidSocket;
+using utils::net::SocketError;
/**
* @return >= 0 on posix, != INVALID_SOCKET on windows
@@ -221,7 +199,7 @@ class Socket : public BaseStream {
io::NetworkInterface local_network_interface_;
// connection information
- SocketDescriptor socket_file_descriptor_{ INVALID_SOCKET }; // -1 on posix
+ SocketDescriptor socket_file_descriptor_{ InvalidSocket }; // -1 on posix
fd_set total_list_{};
fd_set read_fds_{};
diff --git a/libminifi/include/utils/Deleters.h
b/libminifi/include/utils/Deleters.h
index 9ae11e2..e33c3b8 100644
--- a/libminifi/include/utils/Deleters.h
+++ b/libminifi/include/utils/Deleters.h
@@ -21,12 +21,10 @@
#ifdef WIN32
#include <WS2tcpip.h>
#else
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
#include <ifaddrs.h>
#endif /* WIN32 */
#include <utility>
+#include "utils/net/DNS.h"
namespace org {
namespace apache {
@@ -66,11 +64,7 @@ struct StackAwareDeleter {
D impl_;
};
-struct addrinfo_deleter {
- void operator()(addrinfo* const p) const noexcept {
- freeaddrinfo(p);
- }
-};
+using net::addrinfo_deleter;
#ifndef WIN32
struct ifaddrs_deleter {
diff --git a/libminifi/include/utils/GeneralUtils.h
b/libminifi/include/utils/GeneralUtils.h
index a433403..5d8def7 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -92,12 +92,17 @@ struct EnableSharedFromThis : virtual
internal::EnableSharedFromThisBase {
namespace detail {
struct dereference_t {
+ template<typename T, typename =
std::enable_if_t<is_not_null_v<std::decay_t<T>>>>
+ decltype(auto) operator()(T&& ptr) const noexcept { return
*std::forward<T>(ptr); }
+};
+struct unsafe_dereference_t {
template<typename T>
- T &operator()(T *ptr) const noexcept { return *ptr; }
+ decltype(auto) operator()(T&& ptr) const noexcept { return
*std::forward<T>(ptr); }
};
} // namespace detail
constexpr detail::dereference_t dereference{};
+constexpr detail::unsafe_dereference_t unsafe_dereference{};
#if __cpp_lib_remove_cvref >= 201711L
using std::remove_cvref_t;
diff --git a/libminifi/include/utils/OptionalUtils.h
b/libminifi/include/utils/OptionalUtils.h
index cee564a..992493e 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -25,16 +25,13 @@
#include "utils/GeneralUtils.h"
#include "utils/gsl.h"
+#include "utils/detail/MonadicOperationWrappers.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
template<typename T>
-std::optional<utils::remove_cvref_t<T>> optional_from_ptr(T&& obj) {
- return obj == nullptr ? std::nullopt :
std::optional<utils::remove_cvref_t<T>>{ std::forward<T>(obj) };
+std::optional<gsl::not_null<utils::remove_cvref_t<T>>> optional_from_ptr(T&&
obj) {
+ return obj == nullptr ? std::nullopt :
std::optional<gsl::not_null<utils::remove_cvref_t<T>>>{
gsl::make_not_null(std::forward<T>(obj)) };
}
template<typename, typename = void>
@@ -44,11 +41,6 @@ template<typename T>
struct is_optional<std::optional<T>, void> : std::true_type {};
namespace detail {
-template<typename T>
-struct map_wrapper {
- T function;
-};
-
// map implementation
template<typename SourceType, typename F>
auto operator|(const std::optional<SourceType>& o, map_wrapper<F> f)
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o)))
@@ -70,11 +62,6 @@ auto operator|(std::optional<SourceType>&& o, map_wrapper<F>
f) noexcept(noexcep
}
}
-template<typename T>
-struct flat_map_wrapper {
- T function;
-};
-
// flatMap implementation
template<typename SourceType, typename F>
auto operator|(const std::optional<SourceType>& o, flat_map_wrapper<F> f)
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o)))
@@ -97,11 +84,6 @@ auto operator|(std::optional<SourceType>&& o,
flat_map_wrapper<F> f) noexcept(no
}
}
-template<typename T>
-struct or_else_wrapper {
- T function;
-};
-
// orElse implementation
template<typename SourceType, typename F>
auto operator|(std::optional<SourceType> o, or_else_wrapper<F> f)
noexcept(noexcept(std::invoke(std::forward<F>(f.function))))
@@ -123,22 +105,19 @@ auto operator|(std::optional<SourceType> o,
or_else_wrapper<F> f) noexcept(noexc
return std::invoke(std::forward<F>(f.function));
}
}
-} // namespace detail
-template<typename T>
-detail::map_wrapper<T&&> map(T&& func) noexcept { return
{std::forward<T>(func)}; }
-
-template<typename T>
-detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return
{std::forward<T>(func)}; }
-
-template<typename T>
-detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return
{std::forward<T>(func)}; }
-
-} // namespace utils
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+// valueOrElse implementation
+template<typename SourceType, typename F>
+auto operator|(std::optional<SourceType> o, value_or_else_wrapper<F> f)
noexcept(noexcept(std::invoke(std::forward<F>(f.function))))
+ -> std::common_type_t<SourceType,
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>> {
+ if (o) {
+ return *std::move(o);
+ } else {
+ return std::invoke(std::forward<F>(f.function));
+ }
+}
+} // namespace detail
+} // namespace org::apache::nifi::minifi::utils
#endif // LIBMINIFI_INCLUDE_UTILS_OPTIONALUTILS_H_
diff --git a/libminifi/include/utils/OsUtils.h
b/libminifi/include/utils/OsUtils.h
index bf3615a..a9b6253 100644
--- a/libminifi/include/utils/OsUtils.h
+++ b/libminifi/include/utils/OsUtils.h
@@ -54,7 +54,6 @@ std::string getMachineArchitecture();
extern std::string resolve_common_identifiers(const std::string &id);
#endif
-std::string sockaddr_ntop(const sockaddr* const sa);
} /* namespace OsUtils */
} /* namespace utils */
diff --git a/libminifi/include/utils/StringUtils.h
b/libminifi/include/utils/StringUtils.h
index a4e4e1c..7976097 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -36,6 +36,7 @@
#endif
#include "utils/FailurePolicy.h"
#include "utils/gsl.h"
+#include "utils/meta/detected.h"
namespace org::apache::nifi::minifi {
namespace utils {
@@ -213,24 +214,6 @@ class StringUtils {
static size_t size(const std::basic_string_view<CharT>& str) noexcept {
return str.size(); }
struct detail {
- // partial detection idiom impl, from cppreference.com
- struct nonesuch{};
-
- template<typename Default, typename Void, template<class...> class Op,
typename... Args>
- struct detector {
- using value_t = std::false_type;
- using type = Default;
- };
-
- template<typename Default, template<class...> class Op, typename... Args>
- struct detector<Default, std::void_t<Op<Args...>>, Op, Args...> {
- using value_t = std::true_type;
- using type = Op<Args...>;
- };
-
- template<template<class...> class Op, typename... Args>
- using is_detected = typename detector<nonesuch, void, Op,
Args...>::value_t;
-
// implementation detail of join_pack
template<typename CharT>
struct str_detector {
@@ -239,7 +222,7 @@ class StringUtils {
};
template<typename ResultT, typename CharT, typename... Strs>
- using valid_string_pack_t =
std::enable_if_t<(is_detected<str_detector<CharT>::template valid_string_t,
Strs>::value && ...), ResultT>;
+ using valid_string_pack_t =
std::enable_if_t<(meta::is_detected_v<str_detector<CharT>::template
valid_string_t, Strs> && ...), ResultT>;
template<typename CharT, typename... Strs, valid_string_pack_t<void,
CharT, Strs...>* = nullptr>
static std::basic_string<CharT> join_pack(const Strs&... strs) {
diff --git a/libminifi/include/utils/detail/MonadicOperationWrappers.h
b/libminifi/include/utils/detail/MonadicOperationWrappers.h
new file mode 100644
index 0000000..8950eb8
--- /dev/null
+++ b/libminifi/include/utils/detail/MonadicOperationWrappers.h
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <utility>
+
+namespace org::apache::nifi::minifi::utils {
+namespace detail {
+template<typename T>
+struct map_wrapper {
+ T function;
+};
+
+template<typename T>
+struct flat_map_wrapper {
+ T function;
+};
+
+template<typename T>
+struct or_else_wrapper {
+ T function;
+};
+
+template<typename T>
+struct value_or_else_wrapper {
+ T function;
+};
+
+} // namespace detail
+
+template<typename T>
+detail::map_wrapper<T&&> map(T&& func) noexcept { return
{std::forward<T>(func)}; }
+
+template<typename T>
+detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return
{std::forward<T>(func)}; }
+
+template<typename T>
+detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return
{std::forward<T>(func)}; }
+
+template<typename T>
+detail::value_or_else_wrapper<T&&> valueOrElse(T&& func) noexcept { return
{std::forward<T>(func)}; }
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/expected.h
b/libminifi/include/utils/expected.h
new file mode 100644
index 0000000..c3bdce5
--- /dev/null
+++ b/libminifi/include/utils/expected.h
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <type_traits>
+#include <utility>
+#include "nonstd/expected.hpp"
+#include "utils/detail/MonadicOperationWrappers.h"
+#include "utils/GeneralUtils.h"
+#include "utils/meta/detected.h"
+
+namespace org::apache::nifi::minifi::utils {
+namespace detail {
+
+template<typename T>
+inline constexpr bool is_expected_v = false;
+
+template<typename V, typename E>
+inline constexpr bool is_expected_v<nonstd::expected<V, E>> = true;
+
+// map implementation
+template<typename Expected, typename F, typename =
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+auto operator|(Expected&& object, map_wrapper<F> f) {
+ using value_type = typename utils::remove_cvref_t<Expected>::value_type;
+ using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+ if constexpr (std::is_same_v<value_type, void>) {
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+ if (!object.has_value()) {
+ return nonstd::expected<function_return_type,
error_type>{nonstd::unexpect, std::forward<Expected>(object).error()};
+ }
+ if constexpr (std::is_same_v<function_return_type, void>) {
+ std::invoke(std::forward<F>(f.function));
+ return nonstd::expected<void, error_type>{};
+ } else {
+ return nonstd::expected<function_return_type,
error_type>{std::invoke(std::forward<F>(f.function))};
+ }
+ } else {
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function),
*std::forward<Expected>(object)))>;
+ if (!object.has_value()) {
+ return nonstd::expected<function_return_type,
error_type>{nonstd::unexpect, std::forward<Expected>(object).error()};
+ }
+ if constexpr (std::is_same_v<function_return_type, void>) {
+ std::invoke(std::forward<F>(f.function),
*std::forward<Expected>(object));
+ return nonstd::expected<void, error_type>{};
+ } else {
+ return nonstd::expected<function_return_type,
error_type>{std::invoke(std::forward<F>(f.function),
*std::forward<Expected>(object))};
+ }
+ }
+}
+
+// flatMap
+template<typename Expected, typename F, typename =
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+auto operator|(Expected&& object, flat_map_wrapper<F> f) {
+ using value_type = typename utils::remove_cvref_t<Expected>::value_type;
+ if constexpr (std::is_same_v<value_type, void>) {
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+ static_assert(is_expected_v<function_return_type>, "flatMap expects a
function returning expected");
+ if (object.has_value()) {
+ return std::invoke(std::forward<F>(f.function));
+ } else {
+ return function_return_type{nonstd::unexpect,
std::forward<Expected>(object).error()};
+ }
+ } else {
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function),
*std::forward<Expected>(object)))>;
+ static_assert(is_expected_v<function_return_type>, "flatMap expects a
function returning expected");
+ if (object.has_value()) {
+ return std::invoke(std::forward<F>(f.function),
*std::forward<Expected>(object));
+ } else {
+ return function_return_type{nonstd::unexpect,
std::forward<Expected>(object).error()};
+ }
+ }
+}
+
+template<typename Func, typename... Args>
+using invocable_detector = decltype(std::invoke(std::declval<Func>(),
std::declval<Args>()...));
+
+// orElse
+template<typename Expected, typename F, typename =
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+auto operator|(Expected&& object, or_else_wrapper<F> f) {
+ using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+ if (object.has_value()) {
+ return std::forward<Expected>(object);
+ }
+ constexpr bool invocable_with_argument =
meta::is_detected_v<invocable_detector, F, error_type>;
+ if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) {
+ constexpr bool invocable_with_no_argument =
meta::is_detected_v<invocable_detector, F>;
+ static_assert(invocable_with_no_argument);
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+ static_assert(is_expected_v<function_return_type> ||
std::is_same_v<function_return_type, void>, "orElse expects a function
returning expected or void");
+ if constexpr (std::is_same_v<function_return_type, void>) {
+ std::invoke(std::forward<F>(f.function));
+ return std::forward<Expected>(object);
+ } else {
+ return std::invoke(std::forward<F>(f.function));
+ }
+ } else {
+ static_assert(invocable_with_argument);
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function),
std::forward<Expected>(object).error()))>;
+ static_assert(is_expected_v<function_return_type> ||
std::is_same_v<function_return_type, void>, "orElse expects a function
returning expected or void");
+ if constexpr (std::is_same_v<function_return_type, void>) {
+ std::invoke(std::forward<F>(f.function),
std::forward<Expected>(object).error());
+ return std::forward<Expected>(object);
+ } else {
+ return std::invoke(std::forward<F>(f.function),
std::forward<Expected>(object).error());
+ }
+ }
+}
+
+// valueOrElse
+template<typename Expected, typename F, typename =
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+typename utils::remove_cvref_t<Expected>::value_type operator|(Expected&&
object, value_or_else_wrapper<F> f) {
+ using value_type = typename utils::remove_cvref_t<Expected>::value_type;
+ using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+ if (object.has_value()) {
+ return *std::forward<Expected>(object);
+ }
+ constexpr bool invocable_with_argument =
meta::is_detected_v<invocable_detector, F, error_type>;
+ if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) {
+ constexpr bool invocable_with_no_argument =
meta::is_detected_v<invocable_detector, F>;
+ static_assert(invocable_with_no_argument);
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+ static_assert((std::is_same_v<function_return_type, void> &&
std::is_default_constructible_v<value_type>) ||
std::is_constructible_v<value_type, function_return_type>,
+ "valueOrElse expects a function returning value_type or void");
+ if constexpr (std::is_same_v<function_return_type, void>) {
+ std::invoke(std::forward<F>(f.function));
+ return value_type{};
+ } else {
+ return value_type{std::invoke(std::forward<F>(f.function))};
+ }
+ } else {
+ static_assert(invocable_with_argument);
+ using function_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(f.function),
std::forward<Expected>(object).error()))>;
+ static_assert((std::is_same_v<function_return_type, void> &&
std::is_default_constructible_v<value_type>) ||
std::is_constructible_v<value_type, function_return_type>,
+ "valueOrElse expects a function returning value_type or void");
+ if constexpr (std::is_same_v<function_return_type, void>) {
+ std::invoke(std::forward<F>(f.function),
std::forward<Expected>(object).error());
+ return value_type{};
+ } else {
+ return value_type{std::invoke(std::forward<F>(f.function),
std::forward<Expected>(object).error())};
+ }
+ }
+}
+} // namespace detail
+
+template<typename F, typename... Args>
+auto try_expression(F&& action, Args&&... args) noexcept {
+ using action_return_type =
std::decay_t<decltype(std::invoke(std::forward<F>(action),
std::forward<Args>(args)...))>;
+ using return_type = nonstd::expected<action_return_type, std::exception_ptr>;
+ try {
+ if constexpr (std::is_same_v<action_return_type, void>) {
+ std::invoke(std::forward<F>(action), std::forward<Args>(args)...);
+ return return_type{};
+ } else {
+ return return_type{std::invoke(std::forward<F>(action),
std::forward<Args>(args)...)};
+ }
+ } catch (...) {
+ return return_type{nonstd::unexpect, std::current_exception()};
+ }
+}
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/gsl.h b/libminifi/include/utils/gsl.h
index 9bd4f09..01aae19 100644
--- a/libminifi/include/utils/gsl.h
+++ b/libminifi/include/utils/gsl.h
@@ -46,6 +46,13 @@ Container<detail::remove_cvref_t<T>> span_to(gsl::span<T>
span) {
"The destination container must have an iterator (pointer) range
constructor");
return span_to<Container<detail::remove_cvref_t<T>>>(span);
}
+
+template<typename T>
+struct is_not_null : std::false_type {};
+template<typename T>
+struct is_not_null<gsl::not_null<T>> : std::true_type {};
+template<typename T>
+inline constexpr bool is_not_null_v = is_not_null<T>::value;
} // namespace utils
} // namespace minifi
diff --git a/libminifi/include/utils/meta/detected.h
b/libminifi/include/utils/meta/detected.h
new file mode 100644
index 0000000..844d3c1
--- /dev/null
+++ b/libminifi/include/utils/meta/detected.h
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <type_traits>
+
+namespace org::apache::nifi::minifi::utils::meta {
+// detection idiom impl, from cppreference.com
+struct nonesuch{};
+
+namespace detail {
+template<typename Default, typename Void, template<class...> class Op,
typename... Args>
+struct detector {
+ using value_t = std::false_type;
+ using type = Default;
+};
+
+template<typename Default, template<class...> class Op, typename... Args>
+struct detector<Default, std::void_t<Op<Args...>>, Op, Args...> {
+ using value_t = std::true_type;
+ using type = Op<Args...>;
+};
+} // namespace detail
+
+template<template<class...> class Op, typename... Args>
+using is_detected = typename detail::detector<nonesuch, void, Op,
Args...>::value_t;
+
+template<template<class...> class Op, typename... Args>
+inline constexpr bool is_detected_v = is_detected<Op, Args...>::value;
+
+template<template<class...> class Op, typename... Args>
+using detected_t = typename detail::detector<nonesuch, void, Op,
Args...>::type;
+
+template<typename Default, template<class...> class Op, typename... Args>
+using detected_or = detail::detector<Default, void, Op, Args...>;
+
+} // namespace org::apache::nifi::minifi::utils::meta
diff --git a/libminifi/include/utils/net/DNS.h
b/libminifi/include/utils/net/DNS.h
new file mode 100644
index 0000000..786eeeb
--- /dev/null
+++ b/libminifi/include/utils/net/DNS.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <memory>
+#include <string>
+#include <string_view>
+#include <system_error>
+#include "nonstd/expected.hpp"
+#include "utils/gsl.h"
+
+struct addrinfo;
+
+namespace org::apache::nifi::minifi::utils::net {
+struct addrinfo_deleter {
+ void operator()(addrinfo*) const noexcept;
+};
+
+enum class IpProtocol {
+ Tcp,
+ Udp
+};
+
+nonstd::expected<gsl::not_null<std::unique_ptr<addrinfo, addrinfo_deleter>>,
std::error_code> resolveHost(const char* hostname, const char* port, IpProtocol
= IpProtocol::Tcp,
+ bool need_canonname = false);
+inline auto resolveHost(const char* const port, const IpProtocol proto =
IpProtocol::Tcp, const bool need_canonname = false) {
+ return resolveHost(nullptr, port, proto, need_canonname);
+}
+inline auto resolveHost(const char* const hostname, const uint16_t port, const
IpProtocol proto = IpProtocol::Tcp, const bool need_canonname = false) {
+ return resolveHost(hostname, std::to_string(port).c_str(), proto,
need_canonname);
+}
+inline auto resolveHost(const uint16_t port, const IpProtocol proto =
IpProtocol::Tcp, const bool need_canonname = false) {
+ return resolveHost(nullptr, port, proto, need_canonname);
+}
+} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/include/utils/net/Socket.h
b/libminifi/include/utils/net/Socket.h
new file mode 100644
index 0000000..65e92c4
--- /dev/null
+++ b/libminifi/include/utils/net/Socket.h
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <string>
+#include <system_error>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif /* WIN32_LEAN_AND_MEAN */
+#include <WinSock2.h>
+#else
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#endif /* WIN32 */
+#include "nonstd/expected.hpp"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils::net {
+#ifdef WIN32
+using SocketDescriptor = SOCKET;
+using ip4addr = in_addr;
+inline constexpr SocketDescriptor InvalidSocket = INVALID_SOCKET;
+constexpr int SocketError = SOCKET_ERROR;
+#else
+using SocketDescriptor = int;
+using ip4addr = in_addr_t;
+#undef INVALID_SOCKET
+inline constexpr SocketDescriptor InvalidSocket = -1;
+#undef SOCKET_ERROR
+inline constexpr int SocketError = -1;
+#endif /* WIN32 */
+
+/**
+ * Return the last socket error code, based on errno on posix and
WSAGetLastError() on windows.
+ */
+std::error_code get_last_socket_error();
+
+inline void close_socket(SocketDescriptor sockfd) {
+#ifdef WIN32
+ closesocket(sockfd);
+#else
+ ::close(sockfd);
+#endif
+}
+
+class UniqueSocketHandle {
+ public:
+ explicit UniqueSocketHandle(SocketDescriptor owner_sockfd) noexcept
+ :owner_sockfd_(owner_sockfd)
+ {}
+
+ UniqueSocketHandle(const UniqueSocketHandle&) = delete;
+ UniqueSocketHandle(UniqueSocketHandle&& other) noexcept
+ :owner_sockfd_{std::exchange(other.owner_sockfd_, InvalidSocket)}
+ {}
+ ~UniqueSocketHandle() noexcept {
+ if (owner_sockfd_ != InvalidSocket) close_socket(owner_sockfd_);
+ }
+ UniqueSocketHandle& operator=(const UniqueSocketHandle&) = delete;
+ UniqueSocketHandle& operator=(UniqueSocketHandle&& other) noexcept {
+ if (&other == this) return *this;
+ if (owner_sockfd_ != InvalidSocket) close_socket(owner_sockfd_);
+ owner_sockfd_ = std::exchange(other.owner_sockfd_, InvalidSocket);
+ return *this;
+ }
+
+ [[nodiscard]] SocketDescriptor get() const noexcept { return owner_sockfd_; }
+ [[nodiscard]] SocketDescriptor release() noexcept { return
std::exchange(owner_sockfd_, InvalidSocket); }
+ explicit operator bool() const noexcept { return owner_sockfd_ !=
InvalidSocket; }
+ bool operator==(UniqueSocketHandle other) const noexcept { return
owner_sockfd_ == other.owner_sockfd_; }
+
+ private:
+ SocketDescriptor owner_sockfd_;
+};
+
+struct OpenSocketResult {
+ UniqueSocketHandle socket_;
+ gsl::not_null<const addrinfo*> selected_name;
+};
+
+/**
+ * Iterate through getaddrinfo_result and try to call socket() until it
succeeds
+ * @param getaddrinfo_result
+ * @return The file descriptor and the selected list element on success, or
nullopt on error. Use get_last_socket_error_message() to get the error message.
+ */
+nonstd::expected<OpenSocketResult, std::error_code>
open_socket(gsl::not_null<const addrinfo*> getaddrinfo_result);
+inline nonstd::expected<OpenSocketResult, std::error_code> open_socket(const
addrinfo& getaddrinfo_result) { return
open_socket(gsl::make_not_null(&getaddrinfo_result)); }
+
+
+std::string sockaddr_ntop(const sockaddr* sa);
+
+} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 9606184..05b589c 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -92,7 +92,7 @@ void FlowFile::setDeleted(const bool deleted) {
}
}
-std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
+std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() const {
return claim_;
}
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index 6c6136f..f4afffd 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -60,6 +60,10 @@ namespace nifi {
namespace minifi {
namespace core {
+std::string detail::to_string(const detail::ReadBufferResult&
read_buffer_result) {
+ return std::string(reinterpret_cast<const
char*>(read_buffer_result.buffer.data()), read_buffer_result.buffer.size());
+}
+
std::shared_ptr<utils::IdGenerator> ProcessSession::id_generator_ =
utils::IdGenerator::getIdGenerator();
ProcessSession::ProcessSession(std::shared_ptr<ProcessContext> processContext)
@@ -402,6 +406,30 @@ int64_t ProcessSession::readWrite(const
std::shared_ptr<core::FlowFile> &flow, I
}
}
+detail::ReadBufferResult ProcessSession::readBuffer(const
std::shared_ptr<core::FlowFile>& flow) {
+ detail::ReadBufferResult result;
+ struct Callback : InputStreamCallback {
+ detail::ReadBufferResult& result;
+ ProcessSession& session;
+ Callback(detail::ReadBufferResult& result, ProcessSession& session)
+ :result{result}, session{session}
+ {}
+ int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) final {
+ gsl_Expects(inputStream);
+ result.buffer.resize(inputStream->size());
+ const auto read_status =
inputStream->read(reinterpret_cast<uint8_t*>(result.buffer.data()),
result.buffer.size());
+ if (read_status != result.buffer.size()) {
+ session.logger_->log_error("readBuffer: %zu bytes were requested from
the stream but %zu bytes were read. Rolling back.", result.buffer.size(),
read_status);
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire
FlowFile.");
+ }
+ return gsl::narrow<int64_t>(read_status);
+ }
+ };
+ Callback cb{result, *this};
+ result.status = read(flow, &cb);
+ return result;
+}
+
void ProcessSession::importFrom(io::InputStream&& stream, const
std::shared_ptr<core::FlowFile> &flow) {
importFrom(stream, flow);
}
diff --git a/libminifi/src/io/ClientSocket.cpp
b/libminifi/src/io/ClientSocket.cpp
index 61d502b..18ee695 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -48,20 +48,13 @@
#include "utils/file/FileUtils.h"
#include "utils/gsl.h"
#include "utils/OsUtils.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
namespace util = org::apache::nifi::minifi::utils;
namespace mio = org::apache::nifi::minifi::io;
namespace {
-std::string get_last_getaddrinfo_err_str(int getaddrinfo_result) {
-#ifdef WIN32
- (void)getaddrinfo_result; // against unused warnings on windows
- return mio::get_last_socket_error_message();
-#else
- return gai_strerror(getaddrinfo_result);
-#endif /* WIN32 */
-}
-
template<typename T, typename Pred, typename Adv>
auto find_if_custom_linked_list(T* const list, const Adv advance_func, const
Pred predicate) ->
typename
std::enable_if<std::is_convertible<decltype(advance_func(std::declval<T*>())),
T*>::value && std::is_convertible<decltype(predicate(std::declval<T*>())),
bool>::value, T*>::type
@@ -105,7 +98,7 @@ std::error_code set_non_blocking(const mio::SocketDescriptor
fd) noexcept {
}
#else
u_long iMode = 1;
- if (ioctlsocket(fd, FIONBIO, &iMode) == SOCKET_ERROR) {
+ if (ioctlsocket(fd, FIONBIO, &iMode) == mio::SocketError) {
return { WSAGetLastError(), std::system_category() };
}
#endif /* !WIN32 */
@@ -119,18 +112,10 @@ namespace nifi {
namespace minifi {
namespace io {
-std::string get_last_socket_error_message() {
-#ifdef WIN32
- const auto error_code = WSAGetLastError();
-#else
- const auto error_code = errno;
-#endif /* WIN32 */
- return std::system_category().message(error_code);
-}
bool valid_socket(const SocketDescriptor fd) noexcept {
#ifdef WIN32
- return fd != INVALID_SOCKET && fd >= 0;
+ return fd != InvalidSocket && fd >= 0;
#else
return fd >= 0;
#endif /* WIN32 */
@@ -179,7 +164,7 @@ Socket& Socket::operator=(Socket &&other) noexcept {
port_ = std::exchange(other.port_, 0);
is_loopback_only_ = std::exchange(other.is_loopback_only_, false);
local_network_interface_ = std::exchange(other.local_network_interface_, {});
- socket_file_descriptor_ = std::exchange(other.socket_file_descriptor_,
INVALID_SOCKET);
+ socket_file_descriptor_ = std::exchange(other.socket_file_descriptor_,
InvalidSocket);
total_list_ = other.total_list_;
FD_ZERO(&other.total_list_);
read_fds_ = other.read_fds_;
@@ -208,7 +193,7 @@ void Socket::close() {
#else
::close(socket_file_descriptor_);
#endif
- socket_file_descriptor_ = INVALID_SOCKET;
+ socket_file_descriptor_ = InvalidSocket;
}
if (total_written_ > 0) {
local_network_interface_.log_write(gsl::narrow<uint32_t>(total_written_.load()));
@@ -229,7 +214,7 @@ void Socket::setNonBlocking() {
int8_t Socket::createConnection(const addrinfo* const destination_addresses) {
for (const auto *current_addr = destination_addresses; current_addr;
current_addr = current_addr->ai_next) {
if (!valid_socket(socket_file_descriptor_ =
socket(current_addr->ai_family, current_addr->ai_socktype,
current_addr->ai_protocol))) {
- logger_->log_warn("socket: %s", get_last_socket_error_message());
+ logger_->log_warn("socket: %s",
utils::net::get_last_socket_error().message());
continue;
}
setSocketOptions(socket_file_descriptor_);
@@ -237,20 +222,20 @@ int8_t Socket::createConnection(const addrinfo* const
destination_addresses) {
if (listeners_ > 0) {
// server socket
const auto bind_result = bind(socket_file_descriptor_,
current_addr->ai_addr, current_addr->ai_addrlen);
- if (bind_result == SOCKET_ERROR) {
- logger_->log_warn("bind: %s", get_last_socket_error_message());
+ if (bind_result == SocketError) {
+ logger_->log_warn("bind: %s",
utils::net::get_last_socket_error().message());
close();
continue;
}
const auto listen_result = listen(socket_file_descriptor_, listeners_);
- if (listen_result == SOCKET_ERROR) {
- logger_->log_warn("listen: %s", get_last_socket_error_message());
+ if (listen_result == SocketError) {
+ logger_->log_warn("listen: %s",
utils::net::get_last_socket_error().message());
close();
continue;
}
- logger_->log_info("Listening on %s:%" PRIu16 " with backlog %" PRIu16,
utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_, listeners_);
+ logger_->log_info("Listening on %s:%" PRIu16 " with backlog %" PRIu16,
utils::net::sockaddr_ntop(current_addr->ai_addr), port_, listeners_);
} else {
// client socket
#ifndef WIN32
@@ -262,13 +247,13 @@ int8_t Socket::createConnection(const addrinfo* const
destination_addresses) {
#endif /* !WIN32 */
const auto connect_result = connect(socket_file_descriptor_,
current_addr->ai_addr, current_addr->ai_addrlen);
- if (connect_result == SOCKET_ERROR) {
- logger_->log_warn("Couldn't connect to %s:%" PRIu16 ": %s",
utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_,
get_last_socket_error_message());
+ if (connect_result == SocketError) {
+ logger_->log_warn("Couldn't connect to %s:%" PRIu16 ": %s",
utils::net::sockaddr_ntop(current_addr->ai_addr), port_,
utils::net::get_last_socket_error().message());
close();
continue;
}
- logger_->log_info("Connected to %s:%" PRIu16,
utils::OsUtils::sockaddr_ntop(current_addr->ai_addr), port_);
+ logger_->log_info("Connected to %s:%" PRIu16,
utils::net::sockaddr_ntop(current_addr->ai_addr), port_);
}
FD_SET(socket_file_descriptor_, &total_list_);
@@ -293,8 +278,8 @@ int8_t Socket::createConnection(const addrinfo *, ip4addr
&addr) {
sa.sin_family = AF_INET;
sa.sin_port = htons(port_);
sa.sin_addr.s_addr = htonl(is_loopback_only_ ? INADDR_LOOPBACK :
INADDR_ANY);
- if (bind(socket_file_descriptor_, reinterpret_cast<const sockaddr*>(&sa),
sizeof(struct sockaddr_in)) == SOCKET_ERROR) {
- logger_->log_error("Could not bind to socket, reason %s",
get_last_socket_error_message());
+ if (bind(socket_file_descriptor_, reinterpret_cast<const sockaddr*>(&sa),
sizeof(struct sockaddr_in)) == SocketError) {
+ logger_->log_error("Could not bind to socket, reason %s",
utils::net::get_last_socket_error().message());
return -1;
}
@@ -323,7 +308,7 @@ int8_t Socket::createConnection(const addrinfo *, ip4addr
&addr) {
#ifdef WIN32
sa_loc.sin_addr.s_addr = addr.s_addr;
}
- if (connect(socket_file_descriptor_, reinterpret_cast<const
sockaddr*>(&sa_loc), sizeof(sockaddr_in)) == SOCKET_ERROR) {
+ if (connect(socket_file_descriptor_, reinterpret_cast<const
sockaddr*>(&sa_loc), sizeof(sockaddr_in)) == SocketError) {
int err = WSAGetLastError();
if (err == WSAEADDRNOTAVAIL) {
logger_->log_error("invalid or unknown IP");
@@ -350,31 +335,20 @@ int8_t Socket::createConnection(const addrinfo *, ip4addr
&addr) {
}
int Socket::initialize() {
- addrinfo hints{};
- memset(&hints, 0, sizeof hints); // make sure the struct is empty
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_CANONNAME;
- if (listeners_ > 0 && !is_loopback_only_)
- hints.ai_flags = AI_PASSIVE;
- hints.ai_protocol = 0; /* any protocol */
-
- const char* const gai_node = [this]() -> const char* {
+ const char* const hostname = [this]() -> const char* {
if (is_loopback_only_) return "localhost";
if (!is_loopback_only_ && listeners_ > 0) return nullptr; // all
non-localhost server sockets listen on wildcard address
if (!requested_hostname_.empty()) return requested_hostname_.c_str();
return nullptr;
}();
- const auto gai_service = std::to_string(port_);
- addrinfo* getaddrinfo_result = nullptr;
- const int errcode = getaddrinfo(gai_node, gai_service.c_str(), &hints,
&getaddrinfo_result);
- const std::unique_ptr<addrinfo, util::addrinfo_deleter> addr_info{
getaddrinfo_result };
- getaddrinfo_result = nullptr;
- if (errcode != 0) {
- logger_->log_error("getaddrinfo: %s",
get_last_getaddrinfo_err_str(errcode));
+ const bool is_server = hostname == nullptr;
+ const auto addr_info_or_error = utils::net::resolveHost(hostname, port_,
utils::net::IpProtocol::Tcp, !is_server);
+ if (!addr_info_or_error) {
+ logger_->log_error("getaddrinfo: %s",
addr_info_or_error.error().message());
return -1;
}
- socket_file_descriptor_ = INVALID_SOCKET;
+ const auto& addr_info = *addr_info_or_error;
+ socket_file_descriptor_ = InvalidSocket;
// AI_CANONNAME always sets ai_canonname of the first addrinfo structure
canonical_hostname_ = !IsNullOrEmpty(addr_info->ai_canonname) ?
addr_info->ai_canonname : requested_hostname_;
@@ -487,7 +461,7 @@ size_t Socket::write(const uint8_t *value, size_t size) {
// check for errors
if (send_ret <= 0) {
utils::file::FileUtils::close(fd);
- logger_->log_error("Could not send to %d, error: %s", fd,
get_last_socket_error_message());
+ logger_->log_error("Could not send to %d, error: %s", fd,
utils::net::get_last_socket_error().message());
return STREAM_ERROR;
}
bytes += gsl::narrow<size_t>(send_ret);
diff --git a/libminifi/src/io/tls/TLSSocket.cpp
b/libminifi/src/io/tls/TLSSocket.cpp
index af8772a..85e3958 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -292,7 +292,7 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) {
if (listeners_ > 0) {
const auto newfd = accept(socket_file_descriptor_, nullptr, nullptr);
if (!valid_socket(newfd)) {
- logger_->log_error("accept: %s", get_last_socket_error_message());
+ logger_->log_error("accept: %s",
utils::net::get_last_socket_error().message());
return -1;
}
FD_SET(newfd, &total_list_); // add to master set
diff --git a/libminifi/src/utils/NetworkInterfaceInfo.cpp
b/libminifi/src/utils/NetworkInterfaceInfo.cpp
index 82cf763..3384c6b 100644
--- a/libminifi/src/utils/NetworkInterfaceInfo.cpp
+++ b/libminifi/src/utils/NetworkInterfaceInfo.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
#include "utils/NetworkInterfaceInfo.h"
-#include "utils/OsUtils.h"
+#include "utils/net/Socket.h"
#include "core/logging/LoggerConfiguration.h"
#ifdef WIN32
#include <Windows.h>
@@ -56,9 +56,9 @@ NetworkInterfaceInfo::NetworkInterfaceInfo(const
IP_ADAPTER_ADDRESSES* adapter)
name_ = utf8_encode(adapter->FriendlyName);
for (auto unicast_address = adapter->FirstUnicastAddress; unicast_address !=
nullptr; unicast_address = unicast_address->Next) {
if (unicast_address->Address.lpSockaddr->sa_family == AF_INET) {
-
ip_v4_addresses_.push_back(OsUtils::sockaddr_ntop(unicast_address->Address.lpSockaddr));
+
ip_v4_addresses_.push_back(net::sockaddr_ntop(unicast_address->Address.lpSockaddr));
} else if (unicast_address->Address.lpSockaddr->sa_family == AF_INET6) {
-
ip_v6_addresses_.push_back(OsUtils::sockaddr_ntop(unicast_address->Address.lpSockaddr));
+
ip_v6_addresses_.push_back(net::sockaddr_ntop(unicast_address->Address.lpSockaddr));
}
}
running_ = adapter->OperStatus == IfOperStatusUp;
@@ -68,9 +68,9 @@ NetworkInterfaceInfo::NetworkInterfaceInfo(const
IP_ADAPTER_ADDRESSES* adapter)
NetworkInterfaceInfo::NetworkInterfaceInfo(const struct ifaddrs* ifa) {
name_ = ifa->ifa_name;
if (ifa->ifa_addr->sa_family == AF_INET) {
- ip_v4_addresses_.push_back(OsUtils::sockaddr_ntop(ifa->ifa_addr));
+ ip_v4_addresses_.push_back(net::sockaddr_ntop(ifa->ifa_addr));
} else if (ifa->ifa_addr->sa_family == AF_INET6) {
- ip_v6_addresses_.push_back(OsUtils::sockaddr_ntop(ifa->ifa_addr));
+ ip_v6_addresses_.push_back(net::sockaddr_ntop(ifa->ifa_addr));
}
running_ = (ifa->ifa_flags & IFF_RUNNING);
loopback_ = (ifa->ifa_flags & IFF_LOOPBACK);
diff --git a/libminifi/src/utils/OsUtils.cpp b/libminifi/src/utils/OsUtils.cpp
index c4bc58f..cfac877 100644
--- a/libminifi/src/utils/OsUtils.cpp
+++ b/libminifi/src/utils/OsUtils.cpp
@@ -320,40 +320,6 @@ std::string OsUtils::getMachineArchitecture() {
return "unknown";
}
-namespace {
-std::string get_last_socket_error_message() {
-#ifdef WIN32
- const auto error_code = WSAGetLastError();
-#else
- const auto error_code = errno;
-#endif /* WIN32 */
- return std::system_category().message(error_code);
-}
-}
-
-std::string OsUtils::sockaddr_ntop(const sockaddr* const sa) {
- std::string result;
- if (sa->sa_family == AF_INET) {
- sockaddr_in sa_in{};
- std::memcpy(reinterpret_cast<void*>(&sa_in), sa, sizeof(sockaddr_in));
- result.resize(INET_ADDRSTRLEN);
- if (inet_ntop(AF_INET, &sa_in.sin_addr, &result[0], INET_ADDRSTRLEN) ==
nullptr) {
- throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION,
get_last_socket_error_message() };
- }
- } else if (sa->sa_family == AF_INET6) {
- sockaddr_in6 sa_in6{};
- std::memcpy(reinterpret_cast<void*>(&sa_in6), sa, sizeof(sockaddr_in6));
- result.resize(INET6_ADDRSTRLEN);
- if (inet_ntop(AF_INET6, &sa_in6.sin6_addr, &result[0], INET6_ADDRSTRLEN)
== nullptr) {
- throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION,
get_last_socket_error_message() };
- }
- } else {
- throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION,
"sockaddr_ntop: unknown address family" };
- }
- result.resize(strlen(result.c_str())); // discard remaining null bytes at
the end
- return result;
-}
-
} // namespace utils
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/src/utils/net/DNS.cpp b/libminifi/src/utils/net/DNS.cpp
new file mode 100644
index 0000000..eac09e2
--- /dev/null
+++ b/libminifi/src/utils/net/DNS.cpp
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/net/DNS.h"
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include "utils/net/Socket.h"
+#else
+#include <netdb.h>
+#include <cstring>
+#endif /* WIN32 */
+
+namespace org::apache::nifi::minifi::utils::net {
+
+namespace {
+
+#ifndef WIN32
+class addrinfo_category : public std::error_category {
+ public:
+ [[nodiscard]] const char* name() const noexcept override { return
"addrinfo"; }
+
+ [[nodiscard]] std::string message(int value) const override {
+ return gai_strerror(value);
+ }
+};
+
+const addrinfo_category& get_addrinfo_category() {
+ static addrinfo_category instance;
+ return instance;
+}
+#endif
+
+std::error_code get_last_getaddrinfo_err_code(int getaddrinfo_result) {
+#ifdef WIN32
+ (void)getaddrinfo_result; // against unused warnings on windows
+ return std::error_code{WSAGetLastError(), std::system_category()};
+#else
+ return std::error_code{getaddrinfo_result, get_addrinfo_category()};
+#endif /* WIN32 */
+}
+} // namespace
+
+void addrinfo_deleter::operator()(addrinfo* const p) const noexcept {
+ freeaddrinfo(p);
+}
+
+nonstd::expected<gsl::not_null<std::unique_ptr<addrinfo, addrinfo_deleter>>,
std::error_code> resolveHost(const char* const hostname, const char* const
port, const IpProtocol protocol,
+ const bool need_canonname) {
+ addrinfo hints{};
+ memset(&hints, 0, sizeof hints); // make sure the struct is empty
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = protocol == IpProtocol::Tcp ? SOCK_STREAM : SOCK_DGRAM;
+ hints.ai_flags = need_canonname ? AI_CANONNAME : 0;
+ if (!hostname)
+ hints.ai_flags |= AI_PASSIVE;
+ hints.ai_protocol = [protocol]() -> int {
+ switch (protocol) {
+ case IpProtocol::Tcp: return IPPROTO_TCP;
+ case IpProtocol::Udp: return IPPROTO_UDP;
+ }
+ return 0;
+ }();
+
+ addrinfo* getaddrinfo_result = nullptr;
+ const int errcode = getaddrinfo(hostname, port, &hints, &getaddrinfo_result);
+ auto addr_info = gsl::make_not_null(std::unique_ptr<addrinfo,
addrinfo_deleter>{getaddrinfo_result});
+ getaddrinfo_result = nullptr;
+ if (errcode != 0) {
+ return nonstd::make_unexpected(get_last_getaddrinfo_err_code(errcode));
+ }
+ return addr_info;
+}
+
+} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/src/utils/net/Socket.cpp
b/libminifi/src/utils/net/Socket.cpp
new file mode 100644
index 0000000..533dd94
--- /dev/null
+++ b/libminifi/src/utils/net/Socket.cpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/net/Socket.h"
+#include "Exception.h"
+#include <cstring>
+#include <system_error>
+#ifdef WIN32
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif /* WIN32_LEAN_AND_MEAN */
+#include <ws2tcpip.h>
+#else
+#include <arpa/inet.h>
+#endif /* WIN32 */
+
+namespace org::apache::nifi::minifi::utils::net {
+std::error_code get_last_socket_error() {
+#ifdef WIN32
+ const auto error_code = WSAGetLastError();
+#else
+ const auto error_code = errno;
+#endif /* WIN32 */
+ return {error_code, std::system_category()};
+}
+
+nonstd::expected<OpenSocketResult, std::error_code> open_socket(const
gsl::not_null<const addrinfo*> getaddrinfo_result) {
+ for (const addrinfo* it = getaddrinfo_result; it; it = it->ai_next) {
+ const auto fd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
+ if (fd != utils::net::InvalidSocket) return
OpenSocketResult{UniqueSocketHandle{fd}, gsl::make_not_null(it)};
+ }
+ return nonstd::make_unexpected(get_last_socket_error());
+}
+
+std::string sockaddr_ntop(const sockaddr* const sa) {
+ std::string result;
+ if (sa->sa_family == AF_INET) {
+ sockaddr_in sa_in{};
+ std::memcpy(&sa_in, sa, sizeof(sockaddr_in));
+ result.resize(INET_ADDRSTRLEN);
+ if (inet_ntop(AF_INET, &sa_in.sin_addr, &result[0], INET_ADDRSTRLEN) ==
nullptr) {
+ throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION,
get_last_socket_error().message() };
+ }
+ } else if (sa->sa_family == AF_INET6) {
+ sockaddr_in6 sa_in6{};
+ std::memcpy(&sa_in6, sa, sizeof(sockaddr_in6));
+ result.resize(INET6_ADDRSTRLEN);
+ if (inet_ntop(AF_INET6, &sa_in6.sin6_addr, &result[0], INET6_ADDRSTRLEN)
== nullptr) {
+ throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION,
get_last_socket_error().message() };
+ }
+ } else {
+ throw minifi::Exception{ minifi::ExceptionType::GENERAL_EXCEPTION,
"sockaddr_ntop: unknown address family" };
+ }
+ result.resize(strlen(result.c_str())); // discard remaining null bytes at
the end
+ return result;
+}
+
+} // namespace org::apache::nifi::minifi::utils::net
diff --git a/libminifi/test/SingleInputTestController.h
b/libminifi/test/SingleInputTestController.h
new file mode 100644
index 0000000..16c5354
--- /dev/null
+++ b/libminifi/test/SingleInputTestController.h
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <memory>
+#include <set>
+#include <string>
+#include <string_view>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+#include "TestBase.h"
+#include "core/Processor.h"
+
+namespace org::apache::nifi::minifi::test {
+class SingleInputTestController : public TestController {
+ public:
+ explicit SingleInputTestController(const std::shared_ptr<core::Processor>&
processor)
+ : processor_{plan->addProcessor(processor, processor->getName())}
+ {}
+
+ std::unordered_map<core::Relationship,
std::vector<std::shared_ptr<core::FlowFile>>>
+ trigger(const std::string_view input_flow_file_content,
std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
+ const auto new_flow_file = createFlowFile(input_flow_file_content,
std::move(input_flow_file_attributes));
+ input_->put(new_flow_file);
+ plan->runProcessor(processor_);
+ std::unordered_map<core::Relationship,
std::vector<std::shared_ptr<core::FlowFile>>> result;
+ for (const auto& [relationship, connection]: outgoing_connections_) {
+ std::set<std::shared_ptr<core::FlowFile>> expired_flow_files;
+ std::vector<std::shared_ptr<core::FlowFile>> output_flow_files;
+ while (connection->isWorkAvailable()) {
+ auto output_flow_file = connection->poll(expired_flow_files);
+ CHECK(expired_flow_files.empty());
+ if (!output_flow_file) continue;
+ output_flow_files.push_back(std::move(output_flow_file));
+ }
+ result.insert_or_assign(relationship, std::move(output_flow_files));
+ }
+ return result;
+ }
+
+ core::Relationship addDynamicRelationship(std::string name) {
+ auto relationship = core::Relationship{std::move(name), ""};
+ outgoing_connections_.insert_or_assign(relationship,
plan->addConnection(processor_, relationship, nullptr));
+ return relationship;
+ }
+
+ private:
+ std::shared_ptr<core::FlowFile> createFlowFile(const std::string_view
content, std::unordered_map<std::string, std::string> attributes) {
+ const auto flow_file = std::make_shared<FlowFileRecord>();
+ for (auto& attr : std::move(attributes)) {
+ flow_file->setAttribute(attr.first, std::move(attr.second));
+ }
+ auto content_session = plan->getContentRepo()->createSession();
+ auto claim = content_session->create();
+ auto stream = content_session->write(claim);
+ stream->write(reinterpret_cast<const uint8_t*>(content.data()),
content.size());
+ flow_file->setResourceClaim(claim);
+ flow_file->setSize(stream->size());
+ flow_file->setOffset(0);
+
+ stream->close();
+ content_session->commit();
+ return flow_file;
+ }
+
+ public:
+ std::shared_ptr<TestPlan> plan = createPlan();
+
+ protected:
+ core::Processor& getProcessor() const { return *processor_; }
+
+ private:
+ std::shared_ptr<core::Processor> processor_;
+ std::unordered_map<core::Relationship, std::shared_ptr<Connection>>
outgoing_connections_{[this] {
+ std::unordered_map<core::Relationship, std::shared_ptr<Connection>> result;
+ for (const auto& relationship: processor_->getSupportedRelationships()) {
+ result.insert_or_assign(relationship, plan->addConnection(processor_,
relationship, nullptr));
+ }
+ return result;
+ }()};
+ std::shared_ptr<Connection> input_ = plan->addConnection(nullptr,
core::Relationship{"success", "success"}, processor_);
+};
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 4dc83b3..324cb37 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -592,8 +592,8 @@ void TestPlan::validateAnnotations() const {
}
}
-std::string TestPlan::getContent(const
std::shared_ptr<minifi::core::FlowFile>& file) {
- auto content_claim = file->getResourceClaim();
+std::string TestPlan::getContent(const minifi::core::FlowFile& file) const {
+ auto content_claim = file.getResourceClaim();
auto content_stream = content_repo_->read(*content_claim);
auto output_stream = std::make_shared<minifi::io::BufferStream>();
minifi::InputStreamPipe{output_stream}.process(content_stream);
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index bbee252..10833b0 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -247,7 +247,8 @@ class TestPlan {
return state_manager_provider_;
}
- std::string getContent(const std::shared_ptr<minifi::core::FlowFile>& file);
+ std::string getContent(const std::shared_ptr<const minifi::core::FlowFile>&
file) const { return getContent(*file); }
+ std::string getContent(const minifi::core::FlowFile& file) const;
void finalize();
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h
b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 581b68e..a506e6b 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -42,18 +42,6 @@ struct WriteStringToFlowFile : public
minifi::OutputStreamCallback {
}
};
-struct ReadUntilStreamSize : public minifi::InputStreamCallback {
- std::string value_;
-
- int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream)
override {
- value_.clear();
- std::vector<uint8_t> buffer;
- size_t bytes_read = stream->read(buffer, stream->size());
- value_.assign(buffer.begin(), buffer.end());
- return minifi::io::isError(bytes_read) ? -1 :
gsl::narrow<int64_t>(bytes_read);
- }
-};
-
struct ReadUntilItCan : public minifi::InputStreamCallback {
std::string value_;
@@ -132,22 +120,21 @@ void
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
auto clone_second_half = process_session.clone(original_ff, 3, 3);
REQUIRE(clone_first_half != nullptr);
REQUIRE(clone_second_half != nullptr);
- ReadUntilStreamSize read_until_stream_size_callback;
ReadUntilItCan read_until_it_can_callback;
- process_session.read(original_ff, &read_until_stream_size_callback);
+ const auto read_result_original = process_session.readBuffer(original_ff);
process_session.read(original_ff, &read_until_it_can_callback);
CHECK(original_ff->getSize() == 6);
- CHECK(read_until_stream_size_callback.value_ == "foobar");
+ CHECK(to_string(read_result_original) == "foobar");
CHECK(read_until_it_can_callback.value_ == "foobar");
- process_session.read(clone_first_half, &read_until_stream_size_callback);
+ const auto read_result_first_half =
process_session.readBuffer(clone_first_half);
process_session.read(clone_first_half, &read_until_it_can_callback);
CHECK(clone_first_half->getSize() == 3);
- CHECK(read_until_stream_size_callback.value_ == "foo");
+ CHECK(to_string(read_result_first_half) == "foo");
CHECK(read_until_it_can_callback.value_ == "foo");
- process_session.read(clone_second_half, &read_until_stream_size_callback);
+ const auto read_result_second_half =
process_session.readBuffer(clone_second_half);
process_session.read(clone_second_half, &read_until_it_can_callback);
CHECK(clone_second_half->getSize() == 3);
- CHECK(read_until_stream_size_callback.value_ == "bar");
+ CHECK(to_string(read_result_second_half) == "bar");
CHECK(read_until_it_can_callback.value_ == "bar");
}
@@ -163,11 +150,10 @@ void
testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> cont
fixture.transferAndCommit(flow_file);
CHECK(flow_file->getSize() == 8);
- ReadUntilStreamSize read_until_stream_size_callback;
ReadUntilItCan read_until_it_can_callback;
- process_session.read(flow_file, &read_until_stream_size_callback);
+ const auto read_result = process_session.readBuffer(flow_file);
process_session.read(flow_file, &read_until_it_can_callback);
- CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+ CHECK(to_string(read_result) == "myfoobar");
CHECK(read_until_it_can_callback.value_ == "myfoobar");
}
@@ -182,11 +168,10 @@ void
testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> conten
fixture.transferAndCommit(flow_file);
CHECK(flow_file->getSize() == 8);
- ReadUntilStreamSize read_until_stream_size_callback;
+ const auto read_result = process_session.readBuffer(flow_file);
ReadUntilItCan read_until_it_can_callback;
- process_session.read(flow_file, &read_until_stream_size_callback);
process_session.read(flow_file, &read_until_it_can_callback);
- CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+ CHECK(to_string(read_result) == "myfoobar");
CHECK(read_until_it_can_callback.value_ == "myfoobar");
}
} // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/ExpectedTest.cpp
b/libminifi/test/unit/ExpectedTest.cpp
new file mode 100644
index 0000000..c686ef6
--- /dev/null
+++ b/libminifi/test/unit/ExpectedTest.cpp
@@ -0,0 +1,485 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define EXTENSION_LIST ""
+#include <string_view>
+#include "../TestBase.h"
+#include "utils/expected.h"
+#include "utils/gsl.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+// shamelessly copied from
https://github.com/TartanLlama/expected/blob/master/tests/extensions.cpp
(License: CC0)
+TEST_CASE("expected map", "[expected][map]") {
+ auto mul2 = [](int a) { return a * 2; };
+ auto ret_void = [](int) {};
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::map(mul2);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::map(mul2);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::map(mul2);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::map(mul2);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::map(mul2);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::map(mul2);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::map(mul2);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::map(mul2);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::map(ret_void);
+ REQUIRE(ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::map(ret_void);
+ REQUIRE(ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::map(ret_void);
+ REQUIRE(ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::map(ret_void);
+ REQUIRE(ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::map(ret_void);
+ REQUIRE(!ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::map(ret_void);
+ REQUIRE(!ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::map(ret_void);
+ REQUIRE(!ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::map(ret_void);
+ REQUIRE(!ret);
+ STATIC_REQUIRE(
+ (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
+ }
+
+
+ // mapping functions which return references
+ {
+ nonstd::expected<int, int> e(42);
+ auto ret = e | utils::map([](int& i) -> int& { return i; });
+ REQUIRE(ret);
+ REQUIRE(ret == 42);
+ }
+}
+
+TEST_CASE("expected flatMap", "[expected][flatMap]") {
+ auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); };
+ auto fail = [](int) { return nonstd::expected<int, int>(nonstd::unexpect,
17); };
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::flatMap(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::flatMap(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::flatMap(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::flatMap(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::flatMap(succeed);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::flatMap(succeed);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::flatMap(succeed);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::flatMap(succeed);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::flatMap(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+}
+
+TEST_CASE("expected orElse", "[expected][orElse]") {
+ // commented out parts depend on
https://github.com/martinmoene/expected-lite/pull/40 (waiting for release)
+ // using eptr = std::unique_ptr<int>;
+ auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); };
+ // auto succeedptr = [](eptr e) { return nonstd::expected<int, eptr>(21*2);};
+ auto fail = [](int) { return nonstd::expected<int, int>(nonstd::unexpect,
17);};
+ // auto efail = [](eptr e) { *e = 17;return nonstd::expected<int,
eptr>(nonstd::unexpect, std::move(e));};
+ // auto failptr = [](eptr e) { return nonstd::expected<int,
eptr>(nonstd::unexpect, std::move(e));};
+ auto failvoid = [](int) {};
+ // auto failvoidptr = [](const eptr&) { /* don't consume */};
+ // auto consumeptr = [](eptr) {};
+ // auto make_u_int = [](int n) { return std::unique_ptr<int>(new int(n));};
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+
+ /*
+ {
+ nonstd::expected<int, eptr> e = 21;
+ auto ret = std::move(e) | utils::orElse(succeedptr);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+ */
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::orElse(fail);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = e | utils::orElse(fail);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::orElse(fail);
+ REQUIRE(ret);
+ REQUIRE(ret == 21);
+ }
+
+
+ /*
+ {
+ nonstd::expected<int, eptr> e = 21;
+ auto ret = std::move(e) | utils::orElse(efail);
+ REQUIRE(ret);
+ REQUIRE(ret == 21);
+ }
+ */
+
+ {
+ const nonstd::expected<int, int> e = 21;
+ auto ret = std::move(e) | utils::orElse(fail);
+ REQUIRE(ret);
+ REQUIRE(*ret == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ /*
+ {
+ nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
+ auto ret = std::move(e) | utils::orElse(succeedptr);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+ */
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::orElse(succeed);
+ REQUIRE(ret);
+ REQUIRE(*ret == 42);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::orElse(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::orElse(failvoid);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::orElse(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = e | utils::orElse(failvoid);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::orElse(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::orElse(failvoid);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+
+ /*
+ {
+ nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
+ auto ret = std::move(e) | utils::orElse(failvoidptr);
+ REQUIRE(!ret);
+ REQUIRE(*ret.error() == 21);
+ }
+
+ {
+ nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
+ auto ret = std::move(e) | utils::orElse(consumeptr);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == nullptr);
+ }
+ */
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::orElse(fail);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 17);
+ }
+
+ {
+ const nonstd::expected<int, int> e(nonstd::unexpect, 21);
+ auto ret = std::move(e) | utils::orElse(failvoid);
+ REQUIRE(!ret);
+ REQUIRE(ret.error() == 21);
+ }
+}
+
+TEST_CASE("expected valueOrElse", "[expected][valueOrElse]") {
+ using namespace std::literals; // NOLINT
+ nonstd::expected<int, std::string> ex{nonstd::unexpect, "hello"};
+ REQUIRE(42 == (ex | utils::valueOrElse([] { return 42; })));
+ REQUIRE_THROWS_AS(ex | utils::valueOrElse([]{ throw 42; }), int);
+ REQUIRE(gsl::narrow<int>("hello"sv.size()) == (ex |
utils::valueOrElse([](const std::string& err) { return
gsl::narrow<int>(err.size()); })));
+ REQUIRE_THROWS_AS(ex | utils::valueOrElse([](std::string){ throw 42; }),
int);
+ REQUIRE_THROWS_AS(ex | utils::valueOrElse([](const std::string&) -> int {
throw 42; }), int);
+ REQUIRE_THROWS_AS(std::move(ex) | utils::valueOrElse([](std::string&&) ->
int { throw 42; }), int);
+}
diff --git a/libminifi/test/unit/GeneralUtilsTest.cpp
b/libminifi/test/unit/GeneralUtilsTest.cpp
index b75e880..de017c0 100644
--- a/libminifi/test/unit/GeneralUtilsTest.cpp
+++ b/libminifi/test/unit/GeneralUtilsTest.cpp
@@ -59,7 +59,20 @@ static_assert(!does_compile<1, 0>::value, "constexpr
division by zero shouldn't
TEST_CASE("GeneralUtils::dereference", "[dereference]") {
const int a = 42;
- const auto* const pa = &a;
+ const auto pa = gsl::make_not_null(&a);
REQUIRE(42 == utils::dereference(pa));
REQUIRE(&a == &utils::dereference(pa));
+
+ const auto uniq_a = gsl::make_not_null(std::make_unique<int>(99));
+ REQUIRE(99 == utils::dereference(uniq_a));
+}
+
+TEST_CASE("GeneralUtils::unsafe_dereference", "[unsafe_dereference]") {
+ const int a = 42;
+ const int* const pa = &a;
+ REQUIRE(42 == utils::unsafe_dereference(pa));
+ REQUIRE(&a == &utils::unsafe_dereference(pa));
+
+ const auto uniq_a = std::make_unique<int>(99);
+ REQUIRE(99 == utils::unsafe_dereference(uniq_a));
}
diff --git a/libminifi/test/unit/NetUtilsTest.cpp
b/libminifi/test/unit/NetUtilsTest.cpp
new file mode 100644
index 0000000..9e28980
--- /dev/null
+++ b/libminifi/test/unit/NetUtilsTest.cpp
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <functional>
+#include <string>
+#include <type_traits>
+
+#include "../TestBase.h"
+#include "utils/net/DNS.h"
+#include "utils/net/Socket.h"
+#include "utils/StringUtils.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+namespace net = utils::net;
+
+TEST_CASE("net::resolveHost", "[net][dns][utils][resolveHost]") {
+ REQUIRE(net::sockaddr_ntop(net::resolveHost("127.0.0.1",
"10080").value()->ai_addr) == "127.0.0.1");
+ const auto localhost_address =
net::sockaddr_ntop(net::resolveHost("localhost", "10080").value()->ai_addr);
+ REQUIRE((utils::StringUtils::startsWith(localhost_address, "127") ||
localhost_address == "::1"));
+}
diff --git a/libminifi/test/unit/OptionalTest.cpp
b/libminifi/test/unit/OptionalTest.cpp
index 3be8870..d713901 100644
--- a/libminifi/test/unit/OptionalTest.cpp
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -60,3 +60,14 @@ TEST_CASE("optional orElse", "[optional or else]") {
REQUIRE(!test4);
REQUIRE_THROWS_AS(std::optional<bool>{} | utils::orElse([]{ throw ex{}; }),
ex);
}
+
+TEST_CASE("optional valueOrElse", "[optional][valueOrElse]") {
+ const auto seven = std::make_optional(7) | utils::valueOrElse([]() -> int {
throw std::exception{}; });
+ const auto test1 = std::make_optional(6) | utils::valueOrElse([] { return
49; });
+ const auto test2 = std::optional<int>{} | utils::valueOrElse([] { return
size_t{0}; });
+
+ REQUIRE(7 == seven);
+ REQUIRE(6 == test1);
+ REQUIRE(0 == test2);
+ REQUIRE_THROWS_AS(std::optional<int>{} | utils::valueOrElse([]() -> int {
throw std::exception{}; }), std::exception);
+}
diff --git a/libminifi/test/unit/ProcessContextTest.cpp
b/libminifi/test/unit/ProcessContextTest.cpp
new file mode 100644
index 0000000..a9947d7
--- /dev/null
+++ b/libminifi/test/unit/ProcessContextTest.cpp
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include "ProcessContext.h"
+#include "core/FlowFile.h"
+#include "utils/meta/detected.h"
+
+namespace org::apache::nifi::minifi {
+
+template<typename... Args>
+using getProperty_compiles =
decltype(std::declval<core::ProcessContext>().getProperty(std::declval<Args>()...));
+
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string,
const core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string,
const std::shared_ptr<core::FlowFile>&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string,
core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, std::string,
std::shared_ptr<core::FlowFile>>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*,
const core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*,
const std::shared_ptr<core::FlowFile>&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*,
core::FlowFile&>);
+static_assert(!utils::meta::is_detected_v<getProperty_compiles, const char*,
std::shared_ptr<core::FlowFile>>);
+
+} // namespace org::apache::nifi::minifi