szaszm commented on code in PR #1419:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1419#discussion_r989390171
##########
libminifi/include/utils/StringUtils.h:
##########
@@ -37,6 +37,19 @@
#include "utils/gsl.h"
#include "utils/meta/detected.h"
+// libc++ doesn't define operator<=> on strings, and apparently the operator
rewrite rules don't automagically make one
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+#include <compare>
+#endif
+
+#if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION <= 14000
+template<typename _CharT, typename _Traits, typename _Alloc>
+constexpr std::strong_ordering operator<=>(const std::basic_string<_CharT,
_Traits, _Alloc>& __lhs,
+ const std::basic_string<_CharT, _Traits, _Alloc>& __rhs) noexcept {
+ return __lhs.compare(__rhs) <=> 0;
+}
Review Comment:
Did you copy this from libstdc++? I would prefer a reimplementation without
using reserved identifiers and without reusing GNU code. I'm not sure that its
license allows this form of using the code.
##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname =
core::PropertyBuilder::createProperty("Hostname")
+ ->withDescription("The ip address or hostname of the destination.")
+ ->withDefaultValue("localhost")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::Port =
core::PropertyBuilder::createProperty("Port")
+ ->withDescription("The port or service on the destination.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration =
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+ ->withDescription("The amount of time a connection should be held open
without being used before closing the connection. A value of 0 seconds will
disable this feature.")
+ ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::Timeout =
core::PropertyBuilder::createProperty("Timeout")
+ ->withDescription("The timeout for connecting to and communicating with
the destination.")
+ ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile =
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+ ->withDescription("Specifies whether to send each FlowFile's content on an
individual connection.")
+ ->withDefaultValue(false)
+ ->isRequired(true)
+ ->supportsExpressionLanguage(false)
+ ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter =
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+ ->withDescription("Specifies the delimiter to use when sending messages
out over the same TCP stream. "
+ "The delimiter is appended to each FlowFile message that
is transmitted over the stream so that the receiver can determine when one
message ends and the next message begins. "
+ "Users should ensure that the FlowFile content does not
contain the delimiter character to avoid errors.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::SSLContextService =
core::PropertyBuilder::createProperty("SSL Context Service")
+ ->withDescription("The Controller Service to use in order to obtain an SSL
Context. If this property is set, messages will be sent over a secure
connection.")
+ ->isRequired(false)
+ ->asType<minifi::controllers::SSLContextService>()
+ ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer =
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+ ->withDescription("The maximum size of the socket send buffer that should
be used. This is a suggestion to the Operating System to indicate how big the
socket buffer should be.")
+ ->isRequired(false)
+ ->asType<core::DataSizeValue>()
+ ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+ : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+ setSupportedProperties(properties());
+ setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context,
core::ProcessSessionFactory*) {
+ gsl_Expects(context);
+
+ // if the required properties are missing or empty even before evaluating
the EL expression, then we can throw in onSchedule, before we waste any flow
files
+ if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+ }
+ if (context->getProperty(Port).value_or(std::string{}).empty()) {
+ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+ }
+ if (auto idle_connection_expiration =
context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration);
idle_connection_expiration && idle_connection_expiration->getMilliseconds() >
0ms)
+ idle_connection_expiration_ =
idle_connection_expiration->getMilliseconds();
+ else
+ idle_connection_expiration_.reset();
+
+ if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout);
timeout && timeout->getMilliseconds() > 0ms)
+ timeout_ = timeout->getMilliseconds();
+ else
+ timeout_ = 15s;
+
+ std::string context_name;
+ if (context->getProperty(SSLContextService.getName(), context_name) &&
!IsNullOrEmpty(context_name))
+ ssl_context_service_ =
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+ delimiter_.clear();
+ if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+ delimiter_ = ranges::views::transform(*delimiter_str, [](char c) { return
static_cast<std::byte>(c); }) | ranges::to<std::vector>();
+ }
+
+ if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+ connections_.reset();
+ else
+ connections_.emplace();
+
+ if (auto max_size_of_socket_send_buffer =
context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer))
+ max_size_of_socket_send_buffer_ =
max_size_of_socket_send_buffer->getValue();
+ else
+ max_size_of_socket_send_buffer_.reset();
+}
+
+namespace {
+template<class SocketType>
+class ConnectionHandler : public IConnectionHandler {
+ public:
+ ConnectionHandler(ConnectionId connection_id,
+ std::chrono::milliseconds timeout,
+ std::shared_ptr<core::logging::Logger> logger,
+ std::optional<size_t> max_size_of_socket_send_buffer,
+ std::shared_ptr<controllers::SSLContextService>
ssl_context_service)
+ : connection_id_(std::move(connection_id)),
+ timeout_(timeout),
+ logger_(std::move(logger)),
+ max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
+ ssl_context_service_(std::move(ssl_context_service)) {
+ }
+
+ ~ConnectionHandler() override = default;
+
+ nonstd::expected<void, std::error_code> sendData(const
std::vector<std::byte>& data, const std::vector<std::byte>& delimiter) override;
+
+ private:
+ nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
+
+ [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const
override {
+ if (!last_used_)
+ return false;
+ return *last_used_ >= (std::chrono::steady_clock::now() - dur);
+ }
+
+ void reset() override {
+ last_used_.reset();
+ socket_.reset();
+ io_context_.reset();
+ last_error_.clear();
+ deadline_.expires_at(asio::steady_timer::time_point::max());
+ }
+
+ void checkDeadline(const std::error_code& error_code, const
std::shared_ptr<SocketType>& socket);
+ void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const
std::shared_ptr<SocketType>& socket);
+
+ void handleConnect(const std::error_code& error,
+ tcp::resolver::results_type::iterator endpoint_iter,
+ const std::shared_ptr<SocketType>& socket);
+ void handleConnectionSuccess(const tcp::resolver::results_type::iterator&
endpoint_iter,
+ const std::shared_ptr<SocketType>& socket);
+ void handleHandshake(const std::error_code& error,
+ const tcp::resolver::results_type::iterator&
endpoint_iter,
+ const std::shared_ptr<SocketType>& socket);
+
+ void handleWrite(const std::error_code& error,
+ std::size_t bytes_written,
+ const std::vector<std::byte>& delimiter,
+ const std::shared_ptr<SocketType>& socket);
+
+ void handleDelimiterWrite(const std::error_code& error, std::size_t
bytes_written, const std::shared_ptr<SocketType>& socket);
+
+ nonstd::expected<std::shared_ptr<SocketType>, std::error_code>
establishConnection(const tcp::resolver::results_type& resolved_query);
+
+ [[nodiscard]] bool hasBeenUsed() const override { return
last_used_.has_value(); }
+
+ ConnectionId connection_id_;
+ std::optional<std::chrono::steady_clock::time_point> last_used_;
+ asio::io_context io_context_;
+ std::error_code last_error_;
+ asio::steady_timer deadline_{io_context_};
+ std::chrono::milliseconds timeout_;
+ std::shared_ptr<SocketType> socket_;
+
+ std::shared_ptr<core::logging::Logger> logger_;
+ std::optional<size_t> max_size_of_socket_send_buffer_;
+
+ std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
+
+ nonstd::expected<tcp::resolver::results_type, std::error_code>
resolveHostname();
+ nonstd::expected<void, std::error_code> sendDataToSocket(const
std::shared_ptr<SocketType>& socket, const std::vector<std::byte>& data, const
std::vector<std::byte>& delimiter);
+};
+
+template<class SocketType>
+nonstd::expected<void, std::error_code>
ConnectionHandler<SocketType>::sendData(const std::vector<std::byte>& data,
const std::vector<std::byte>& delimiter) {
+ return getSocket() | utils::flatMap([&](const std::shared_ptr<SocketType>&
socket) { return sendDataToSocket(socket, data, delimiter); });;
+}
+
+template<class SocketType>
+nonstd::expected<std::shared_ptr<SocketType>, std::error_code>
ConnectionHandler<SocketType>::getSocket() {
+ if (socket_ && socket_->lowest_layer().is_open())
+ return socket_;
+ auto new_socket = resolveHostname() | utils::flatMap([&](const auto&
resolved_query) { return establishConnection(resolved_query); });
+ if (!new_socket)
+ return nonstd::make_unexpected(new_socket.error());
+ socket_ = std::move(*new_socket);
+ return socket_;
+}
+
+template<class SocketType>
+void ConnectionHandler<SocketType>::checkDeadline(const std::error_code&
error_code, const std::shared_ptr<SocketType>& socket) {
Review Comment:
`std::error_code` is cheap to copy: its size is 2 * sizeof(void*), and is
trivial to copy. It's better to pass it by value.
Also, is it necessary to pass a shared_ptr here? There is no copying from
it, so I think an observer pointer would do.
##########
extensions/standard-processors/processors/PutTCP.h:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <utility>
+
+#include "Processor.h"
+#include "utils/Export.h"
+#include "controllers/SSLContextService.h"
+
+#include "utils/expected.h"
+#include "utils/StringUtils.h" // for string <=> on libc++
+
+namespace org::apache::nifi::minifi::processors {
+
+class ConnectionId {
+ public:
+ ConnectionId(std::string hostname, std::string port) :
hostname_(std::move(hostname)), port_(std::move(port)) {}
+
+ struct hash {
+ std::size_t operator () (const ConnectionId& connection_id) const {
+ return
utils::hash_combine(std::hash<std::string>{}(connection_id.hostname_),
std::hash<std::string>{}(connection_id.port_));
+ }
+ };
Review Comment:
I'd prefer to specialize `std::hash` instead. It's not much more code, but
it's much easier to use with standard containers.
##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PutTCP.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "range/v3/range/conversion.hpp"
+#include "range/v3/view/transform.hpp"
+
+#include "utils/gsl.h"
+#include "utils/expected.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "controllers/SSLContextService.h"
+
+#include "asio/ssl.hpp"
+#include "asio/ip/tcp.hpp"
+#include "asio/write.hpp"
+#include "asio/high_resolution_timer.hpp"
+
+using asio::ip::tcp;
+using TcpSocket = asio::ip::tcp::socket;
+using SslSocket = asio::ssl::stream<tcp::socket>;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property PutTCP::Hostname =
core::PropertyBuilder::createProperty("Hostname")
+ ->withDescription("The ip address or hostname of the destination.")
+ ->withDefaultValue("localhost")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::Port =
core::PropertyBuilder::createProperty("Port")
+ ->withDescription("The port or service on the destination.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::IdleConnectionExpiration =
core::PropertyBuilder::createProperty("Idle Connection Expiration")
+ ->withDescription("The amount of time a connection should be held open
without being used before closing the connection. A value of 0 seconds will
disable this feature.")
+ ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::Timeout =
core::PropertyBuilder::createProperty("Timeout")
+ ->withDescription("The timeout for connecting to and communicating with
the destination.")
+ ->withDefaultValue<core::TimePeriodValue>("15 seconds")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::ConnectionPerFlowFile =
core::PropertyBuilder::createProperty("Connection Per FlowFile")
+ ->withDescription("Specifies whether to send each FlowFile's content on an
individual connection.")
+ ->withDefaultValue(false)
+ ->isRequired(true)
+ ->supportsExpressionLanguage(false)
+ ->build();
+
+const core::Property PutTCP::OutgoingMessageDelimiter =
core::PropertyBuilder::createProperty("Outgoing Message Delimiter")
+ ->withDescription("Specifies the delimiter to use when sending messages
out over the same TCP stream. "
+ "The delimiter is appended to each FlowFile message that
is transmitted over the stream so that the receiver can determine when one
message ends and the next message begins. "
+ "Users should ensure that the FlowFile content does not
contain the delimiter character to avoid errors.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property PutTCP::SSLContextService =
core::PropertyBuilder::createProperty("SSL Context Service")
+ ->withDescription("The Controller Service to use in order to obtain an SSL
Context. If this property is set, messages will be sent over a secure
connection.")
+ ->isRequired(false)
+ ->asType<minifi::controllers::SSLContextService>()
+ ->build();
+
+const core::Property PutTCP::MaxSizeOfSocketSendBuffer =
core::PropertyBuilder::createProperty("Max Size of Socket Send Buffer")
+ ->withDescription("The maximum size of the socket send buffer that should
be used. This is a suggestion to the Operating System to indicate how big the
socket buffer should be.")
+ ->isRequired(false)
+ ->asType<core::DataSizeValue>()
+ ->build();
+
+const core::Relationship PutTCP::Success{"success", "FlowFiles that are sent
to the destination are sent out this relationship."};
+const core::Relationship PutTCP::Failure{"failure", "FlowFiles that
encountered IO errors are send out this relationship."};
+
+PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid)
+ : Processor(name, uuid) {}
+
+PutTCP::~PutTCP() = default;
+
+void PutTCP::initialize() {
+ setSupportedProperties(properties());
+ setSupportedRelationships(relationships());
+}
+
+void PutTCP::notifyStop() {}
+
+void PutTCP::onSchedule(core::ProcessContext* const context,
core::ProcessSessionFactory*) {
+ gsl_Expects(context);
+
+ // if the required properties are missing or empty even before evaluating
the EL expression, then we can throw in onSchedule, before we waste any flow
files
+ if (context->getProperty(Hostname).value_or(std::string{}).empty()) {
+ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
+ }
+ if (context->getProperty(Port).value_or(std::string{}).empty()) {
+ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
+ }
+ if (auto idle_connection_expiration =
context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration);
idle_connection_expiration && idle_connection_expiration->getMilliseconds() >
0ms)
+ idle_connection_expiration_ =
idle_connection_expiration->getMilliseconds();
+ else
+ idle_connection_expiration_.reset();
+
+ if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout);
timeout && timeout->getMilliseconds() > 0ms)
+ timeout_ = timeout->getMilliseconds();
+ else
+ timeout_ = 15s;
+
+ std::string context_name;
+ if (context->getProperty(SSLContextService.getName(), context_name) &&
!IsNullOrEmpty(context_name))
+ ssl_context_service_ =
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name));
+
+ delimiter_.clear();
+ if (auto delimiter_str = context->getProperty(OutgoingMessageDelimiter)) {
+ delimiter_ = ranges::views::transform(*delimiter_str, [](char c) { return
static_cast<std::byte>(c); }) | ranges::to<std::vector>();
+ }
Review Comment:
I think this is simpler. It doesn't involve mutating the vector contents,
and also avoids extra allocations, since the vector is created in one step.
```suggestion
delimiter_ =
utils::span_to<std::vector>(gsl::make_span(context->getProperty(OutgoingMessageDelimiter).value_or(std::string{})).as_span<const
std::byte>());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]