martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1236810704


##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
  */
 #include "GetTCP.h"
 
-#ifndef WIN32
-#include <dirent.h>
-#endif
 #include <cinttypes>
-#include <future>
 #include <memory>
-#include <mutex>
 #include <thread>
-#include <utility>
-#include <vector>
 #include <string>
 
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
 #include "io/StreamFactory.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
 
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property GetTCP::EndpointList(
-    core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A 
comma delimited list of the endpoints to connect to. The format should be 
<server_address>:<port>.")->isRequired(true)
-        ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-    
core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number
 of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+    core::PropertyBuilder::createProperty("Endpoint List")
+      ->withDescription("A comma delimited list of the endpoints to connect 
to. The format should be <server_address>:<port>.")
+      ->isRequired(true)->build());
 
-const core::Property GetTCP::ReconnectInterval(
-    
core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The
 number of seconds to wait before attempting to reconnect to the endpoint.")
-        ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-    
core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The
 size of the buffer to receive data in. Default 16384 
(16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+      ->withDescription("SSL Context Service Name")
+      ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+    core::PropertyBuilder::createProperty("Message 
Delimiter")->withDescription(
+        "Character that denotes the end of the message.")
+        ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of messages allowed to be buffered 
before processing them when the processor is triggered. "
+                          "If the buffer is full, the message is ignored. If 
set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(10000)
+        ->isRequired(true)
         ->build());
 
-const core::Property GetTCP::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context 
Service")->withDescription("SSL Context Service 
Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of messages to process at a 
time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->isRequired(true)
+        ->build());
 
-const core::Property GetTCP::StayConnected(
-    core::PropertyBuilder::createProperty("Stay 
Connected")->withDescription("Determines if we keep the same socket despite 
having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+    core::PropertyBuilder::createProperty("Maximum Message Size")
+      ->withDescription("Optional size of the buffer to receive data 
in.")->build());
 
-const core::Property GetTCP::ConnectionAttemptLimit(
-    
core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum
 number of connection attempts before attempting backup hosts, if 
configured")->withDefaultValue<int>(
-        3)->build());
+const core::Property GetTCP::Timeout = 
core::PropertyBuilder::createProperty("Timeout")
+    ->withDescription("The timeout for connecting to and communicating with 
the destination.")
+    ->withDefaultValue<core::TimePeriodValue>("1s")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
-const core::Property GetTCP::EndOfMessageByte(
-    
core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
-        "Byte value which denotes end of message. Must be specified as integer 
within the valid byte range  (-128 thru 127). For example, '13' = Carriage 
return and '10' = New line. Default '13'.")
-        ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval = 
core::PropertyBuilder::createProperty("Reconnection Interval")
+    ->withDescription("The duration to wait before attempting to reconnect to 
the endpoints.")
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
 
 const core::Relationship GetTCP::Success("success", "All files are routed to 
success");
 const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete 
message as a result of encountering the end of message byte trigger");
 
-int16_t DataHandler::handle(const std::string& source, uint8_t *message, 
size_t size, bool partial) {
-  std::shared_ptr<core::ProcessSession> my_session = 
sessionFactory_->createSession();
-  std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
-  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const 
std::byte*>(message), size));
-
-  my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
-  if (partial) {
-    my_session->transfer(flowFile, GetTCP::Partial);
-  } else {
-    my_session->transfer(flowFile, GetTCP::Success);
-  }
-
-  my_session->commit();
-
-  return 0;
-}
 void GetTCP::initialize() {
   setSupportedProperties(properties());
   setSupportedRelationships(relationships());
 }
 
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  std::string value;
-  if (context->getProperty(EndpointList.getName(), value)) {
-    endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, 
const std::shared_ptr<core::ProcessSessionFactory>&) {
+  std::vector<utils::net::ConnectionId> connections_to_make;
+  if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+     for (const auto& endpoint_str : 
utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+        auto hostname_service_pair = 
utils::StringUtils::splitAndTrim(endpoint_str, ":");
+        if (hostname_service_pair.size() != 2) {
+          logger_->log_error("%s endpoint is invalid, expected 
{hostname}:{service} format", endpoint_str);
+          continue;
+        }
+       connections_to_make.emplace_back(hostname_service_pair[0], 
hostname_service_pair[1]);
+     }
   }
 
-  int handlers = 0;
-  if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
-    concurrent_handlers_ = handlers;
-  }
+  if (connections_to_make.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in 
endpoint-list property");
 
-  stay_connected_ = true;
-  if (context->getProperty(StayConnected.getName(), value)) {
-    stay_connected_ = utils::StringUtils::toBool(value).value_or(true);
+  char delimiter = '\n';
+  if (auto delimiter_str = context->getProperty(MessageDelimiter)) {
+    auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str);
+    if (!parsed_delimiter)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid 
delimiter: {} (it must be a single (escaped or not) character", 
*delimiter_str));
+    delimiter = **parsed_delimiter;

Review Comment:
   you are right, I've fixed this (unsuccesfully) in 
https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/9dce92909600a74ba320bd3c56612acd0c8ab5fa
 (and fixed the fix in 
https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/c824ab601e3bbdc01177a55496096ddc625356b1)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to