martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085089224
##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -160,339 +178,147 @@ void PutTCP::onSchedule(core::ProcessContext* const
context, core::ProcessSessio
}
namespace {
+template<class SocketType>
+asio::awaitable<std::tuple<std::error_code>> handshake(SocketType&,
asio::steady_timer::duration) {
+ co_return std::error_code();
+}
+
+template<>
+asio::awaitable<std::tuple<std::error_code>> handshake(SslSocket& socket,
asio::steady_timer::duration timeout_duration) {
+ co_return co_await
asyncOperationWithTimeout(socket.async_handshake(HandshakeType::client,
use_nothrow_awaitable), timeout_duration); // NOLINT
+}
+
template<class SocketType>
class ConnectionHandler : public ConnectionHandlerBase {
public:
ConnectionHandler(detail::ConnectionId connection_id,
std::chrono::milliseconds timeout,
std::shared_ptr<core::logging::Logger> logger,
std::optional<size_t> max_size_of_socket_send_buffer,
- std::shared_ptr<controllers::SSLContextService>
ssl_context_service)
+ std::optional<asio::ssl::context>& ssl_context)
: connection_id_(std::move(connection_id)),
- timeout_(timeout),
+ timeout_duration_(timeout),
logger_(std::move(logger)),
max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer),
- ssl_context_service_(std::move(ssl_context_service)) {
+ ssl_context_(ssl_context) {
}
~ConnectionHandler() override = default;
- nonstd::expected<void, std::error_code> sendData(const
std::shared_ptr<io::InputStream>& flow_file_content_stream, const
std::vector<std::byte>& delimiter) override;
+ asio::awaitable<std::error_code> sendStreamWithDelimiter(const
std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>&
delimiter, asio::io_context& io_context_) override;
private:
- nonstd::expected<std::shared_ptr<SocketType>, std::error_code> getSocket();
-
[[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const
override {
- return last_used_ && *last_used_ >= (std::chrono::steady_clock::now() -
dur);
+ return last_used_ && *last_used_ >= (steady_clock::now() - dur);
}
void reset() override {
last_used_.reset();
socket_.reset();
- io_context_.reset();
- last_error_.clear();
- deadline_.expires_at(asio::steady_timer::time_point::max());
}
- void checkDeadline(std::error_code error_code, SocketType* socket);
- void startConnect(tcp::resolver::results_type::iterator endpoint_iter, const
std::shared_ptr<SocketType>& socket);
-
- void handleConnect(std::error_code error,
- tcp::resolver::results_type::iterator endpoint_iter,
- const std::shared_ptr<SocketType>& socket);
- void handleConnectionSuccess(const tcp::resolver::results_type::iterator&
endpoint_iter,
- const std::shared_ptr<SocketType>& socket);
- void handleHandshake(std::error_code error,
- const tcp::resolver::results_type::iterator&
endpoint_iter,
- const std::shared_ptr<SocketType>& socket);
-
- void handleWrite(std::error_code error,
- std::size_t bytes_written,
- const std::shared_ptr<io::InputStream>&
flow_file_content_stream,
- const std::vector<std::byte>& delimiter,
- const std::shared_ptr<SocketType>& socket);
+ [[nodiscard]] bool hasBeenUsed() const override { return
last_used_.has_value(); }
+ [[nodiscard]] asio::awaitable<std::error_code>
setupUsableSocket(asio::io_context& io_context);
+ [[nodiscard]] bool hasUsableSocket() const { return socket_ &&
socket_->lowest_layer().is_open(); }
- void handleDelimiterWrite(std::error_code error, std::size_t bytes_written,
const std::shared_ptr<SocketType>& socket);
+ asio::awaitable<std::error_code> establishNewConnection(const
tcp::resolver::results_type& endpoints, asio::io_context& io_context_);
+ asio::awaitable<std::error_code> send(const
std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>&
delimiter);
- nonstd::expected<std::shared_ptr<SocketType>, std::error_code>
establishConnection(const tcp::resolver::results_type& resolved_query);
-
- [[nodiscard]] bool hasBeenUsed() const override { return
last_used_.has_value(); }
+ SocketType createNewSocket(asio::io_context& io_context_);
detail::ConnectionId connection_id_;
- std::optional<std::chrono::steady_clock::time_point> last_used_;
- asio::io_context io_context_;
- std::error_code last_error_;
- asio::steady_timer deadline_{io_context_};
- std::chrono::milliseconds timeout_;
- std::shared_ptr<SocketType> socket_;
+ std::optional<SocketType> socket_;
+
+ std::optional<steady_clock::time_point> last_used_;
+ std::chrono::milliseconds timeout_duration_;
std::shared_ptr<core::logging::Logger> logger_;
std::optional<size_t> max_size_of_socket_send_buffer_;
- std::shared_ptr<controllers::SSLContextService> ssl_context_service_;
-
- nonstd::expected<tcp::resolver::results_type, std::error_code>
resolveHostname();
- nonstd::expected<void, std::error_code> sendDataToSocket(const
std::shared_ptr<SocketType>& socket,
- const
std::shared_ptr<io::InputStream>& flow_file_content_stream,
- const
std::vector<std::byte>& delimiter);
+ std::optional<asio::ssl::context>& ssl_context_;
Review Comment:
Good idea, I've chagned it in
https://github.com/apache/nifi-minifi-cpp/pull/1457/commits/f37cffb3fe783683d38646b198dfc38f7a439ce4#diff-a62ddb0368e1813dff2a90fe7433e148bfc7b21b7d7ad9c8a066fd040d209ac9R237
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]