martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085095088
##########
libminifi/src/utils/net/TcpServer.cpp:
##########
@@ -15,53 +15,73 @@
* limitations under the License.
*/
#include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
namespace org::apache::nifi::minifi::utils::net {
-TcpSession::TcpSession(asio::io_context& io_context,
utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t>
max_queue_size, std::shared_ptr<core::logging::Logger> logger)
- : concurrent_queue_(concurrent_queue),
- max_queue_size_(max_queue_size),
- socket_(io_context),
- logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::doReceive() {
+ asio::ip::tcp::acceptor acceptor(io_context_,
asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+ if (port_ == 0)
+ port_ = acceptor.local_endpoint().port();
+ while (true) {
+ auto [accept_error, socket] = co_await
acceptor.async_accept(use_nothrow_awaitable);
+ if (accept_error) {
+ logger_->log_error("Error during accepting new connection: %s",
accept_error.message());
+ break;
+ }
+ if (ssl_data_)
+ co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+ else
+ co_spawn(io_context_, insecureSession(std::move(socket)),
asio::detached);
+ }
}
-asio::ip::tcp::socket& TcpSession::getSocket() {
- return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+ std::string read_message;
+ while (true) {
+ auto [read_error, bytes_read] = co_await asio::async_read_until(socket,
asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable); // NOLINT
+ if (read_error || bytes_read == 0)
+ co_return;
-void TcpSession::start() {
- asio::async_read_until(socket_,
- buffer_,
- '\n',
- [self = shared_from_this()](const auto& error_code,
size_t) -> void {
- self->handleReadUntilNewLine(error_code);
- });
+ if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+ concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read -
1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(),
socket.lowest_layer().local_endpoint().port()));
Review Comment:
Thats shouldnt be possible based on the documentation. if the
async_read_until returns before it encountered the delimiter than the
read_error will be set accordingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]