martinzink commented on code in PR #1457:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1085095088


##########
libminifi/src/utils/net/TcpServer.cpp:
##########
@@ -15,53 +15,73 @@
  * limitations under the License.
  */
 #include "utils/net/TcpServer.h"
+#include "utils/net/AsioCoro.h"
 
 namespace org::apache::nifi::minifi::utils::net {
 
-TcpSession::TcpSession(asio::io_context& io_context, 
utils::ConcurrentQueue<Message>& concurrent_queue, std::optional<size_t> 
max_queue_size, std::shared_ptr<core::logging::Logger> logger)
-  : concurrent_queue_(concurrent_queue),
-    max_queue_size_(max_queue_size),
-    socket_(io_context),
-    logger_(std::move(logger)) {
+asio::awaitable<void> TcpServer::doReceive() {
+  asio::ip::tcp::acceptor acceptor(io_context_, 
asio::ip::tcp::endpoint(asio::ip::tcp::v6(), port_));
+  if (port_ == 0)
+    port_ = acceptor.local_endpoint().port();
+  while (true) {
+    auto [accept_error, socket] = co_await 
acceptor.async_accept(use_nothrow_awaitable);
+    if (accept_error) {
+      logger_->log_error("Error during accepting new connection: %s", 
accept_error.message());
+      break;
+    }
+    if (ssl_data_)
+      co_spawn(io_context_, secureSession(std::move(socket)), asio::detached);
+    else
+      co_spawn(io_context_, insecureSession(std::move(socket)), 
asio::detached);
+  }
 }
 
-asio::ip::tcp::socket& TcpSession::getSocket() {
-  return socket_;
-}
+asio::awaitable<void> TcpServer::readLoop(auto& socket) {
+  std::string read_message;
+  while (true) {
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, 
asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    if (read_error || bytes_read == 0)
+      co_return;
 
-void TcpSession::start() {
-  asio::async_read_until(socket_,
-                         buffer_,
-                         '\n',
-                         [self = shared_from_this()](const auto& error_code, 
size_t) -> void {
-                           self->handleReadUntilNewLine(error_code);
-                         });
+    if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
+      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 
1), IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), 
socket.lowest_layer().local_endpoint().port()));

Review Comment:
   Thats shouldnt be possible based on the documentation. if the 
async_read_until returns before it encountered the delimiter than the 
read_error will be set accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to