martinzink commented on code in PR #1592:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1592#discussion_r1247805318
##########
extensions/standard-processors/processors/GetTCP.cpp:
##########
@@ -17,275 +17,287 @@
*/
#include "GetTCP.h"
-#ifndef WIN32
-#include <dirent.h>
-#endif
#include <cinttypes>
-#include <future>
#include <memory>
-#include <mutex>
#include <thread>
-#include <utility>
-#include <vector>
#include <string>
-#include "io/ClientSocket.h"
+#include <asio/read_until.hpp>
+#include <asio/detached.hpp>
+#include "utils/net/AsioCoro.h"
#include "io/StreamFactory.h"
#include "utils/gsl.h"
#include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessSessionFactory.h"
#include "core/PropertyBuilder.h"
#include "core/Resource.h"
-namespace org::apache::nifi::minifi::processors {
+using namespace std::literals::chrono_literals;
-const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
+namespace org::apache::nifi::minifi::processors {
const core::Property GetTCP::EndpointList(
- core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A
comma delimited list of the endpoints to connect to. The format should be
<server_address>:<port>.")->isRequired(true)
- ->build());
-
-const core::Property GetTCP::ConcurrentHandlers(
-
core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number
of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
+ core::PropertyBuilder::createProperty("Endpoint List")
+ ->withDescription("A comma delimited list of the endpoints to connect
to. The format should be <server_address>:<port>.")
+ ->isRequired(true)->build());
-const core::Property GetTCP::ReconnectInterval(
-
core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The
number of seconds to wait before attempting to reconnect to the endpoint.")
- ->withDefaultValue<core::TimePeriodValue>("5 s")->build());
-
-const core::Property GetTCP::ReceiveBufferSize(
-
core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The
size of the buffer to receive data in. Default 16384
(16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
+const core::Property GetTCP::SSLContextService(
+ core::PropertyBuilder::createProperty("SSL Context Service")
+ ->withDescription("SSL Context Service Name")
+ ->asType<minifi::controllers::SSLContextService>()->build());
+
+const core::Property GetTCP::MessageDelimiter(
+ core::PropertyBuilder::createProperty("Message
Delimiter")->withDescription(
+ "Character that denotes the end of the message.")
+ ->withDefaultValue("\\n")->build());
+
+const core::Property GetTCP::MaxQueueSize(
+ core::PropertyBuilder::createProperty("Max Size of Message Queue")
+ ->withDescription("Maximum number of messages allowed to be buffered
before processing them when the processor is triggered. "
+ "If the buffer is full, the message is ignored. If
set to zero the buffer is unlimited.")
+ ->withDefaultValue<uint64_t>(10000)
+ ->isRequired(true)
->build());
-const core::Property GetTCP::SSLContextService(
- core::PropertyBuilder::createProperty("SSL Context
Service")->withDescription("SSL Context Service
Name")->asType<minifi::controllers::SSLContextService>()->build());
+const core::Property GetTCP::MaxBatchSize(
+ core::PropertyBuilder::createProperty("Max Batch Size")
+ ->withDescription("The maximum number of messages to process at a
time.")
+ ->withDefaultValue<uint64_t>(500)
+ ->isRequired(true)
+ ->build());
-const core::Property GetTCP::StayConnected(
- core::PropertyBuilder::createProperty("Stay
Connected")->withDescription("Determines if we keep the same socket despite
having no data")->withDefaultValue<bool>(true)->build());
+const core::Property GetTCP::MaxMessageSize(
+ core::PropertyBuilder::createProperty("Maximum Message Size")
+ ->withDescription("Optional size of the buffer to receive data
in.")->build());
-const core::Property GetTCP::ConnectionAttemptLimit(
-
core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum
number of connection attempts before attempting backup hosts, if
configured")->withDefaultValue<int>(
- 3)->build());
+const core::Property GetTCP::Timeout =
core::PropertyBuilder::createProperty("Timeout")
+ ->withDescription("The timeout for connecting to and communicating with
the destination.")
+ ->withDefaultValue<core::TimePeriodValue>("1s")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
-const core::Property GetTCP::EndOfMessageByte(
-
core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
- "Byte value which denotes end of message. Must be specified as integer
within the valid byte range (-128 thru 127). For example, '13' = Carriage
return and '10' = New line. Default '13'.")
- ->withDefaultValue("13")->build());
+const core::Property GetTCP::ReconnectInterval =
core::PropertyBuilder::createProperty("Reconnection Interval")
+ ->withDescription("The duration to wait before attempting to reconnect to
the endpoints.")
+ ->withDefaultValue<core::TimePeriodValue>("1 min")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
const core::Relationship GetTCP::Success("success", "All files are routed to
success");
const core::Relationship GetTCP::Partial("partial", "Indicates an incomplete
message as a result of encountering the end of message byte trigger");
-int16_t DataHandler::handle(const std::string& source, uint8_t *message,
size_t size, bool partial) {
- std::shared_ptr<core::ProcessSession> my_session =
sessionFactory_->createSession();
- std::shared_ptr<core::FlowFile> flowFile = my_session->create();
-
- my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const
std::byte*>(message), size));
-
- my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
-
- if (partial) {
- my_session->transfer(flowFile, GetTCP::Partial);
- } else {
- my_session->transfer(flowFile, GetTCP::Success);
- }
-
- my_session->commit();
-
- return 0;
-}
void GetTCP::initialize() {
setSupportedProperties(properties());
setSupportedRelationships(relationships());
}
-void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
- std::string value;
- if (context->getProperty(EndpointList.getName(), value)) {
- endpoints = utils::StringUtils::split(value, ",");
+void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context,
const std::shared_ptr<core::ProcessSessionFactory>&) {
+ std::vector<utils::net::ConnectionId> connections_to_make;
+ if (auto endpoint_list_str = context->getProperty(EndpointList)) {
+ for (const auto& endpoint_str :
utils::StringUtils::splitAndTrim(*endpoint_list_str, ",")) {
+ auto hostname_service_pair =
utils::StringUtils::splitAndTrim(endpoint_str, ":");
+ if (hostname_service_pair.size() != 2) {
+ logger_->log_error("%s endpoint is invalid, expected
{hostname}:{service} format", endpoint_str);
+ continue;
+ }
+ connections_to_make.emplace_back(hostname_service_pair[0],
hostname_service_pair[1]);
+ }
}
- int handlers = 0;
- if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
- concurrent_handlers_ = handlers;
- }
+ if (connections_to_make.empty())
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "No valid endpoint in Endpoint
List property");
Review Comment:
good idea, changed it in
https://github.com/apache/nifi-minifi-cpp/pull/1592/commits/54e037239cd49365a89976cc280c7bfcc0e198aa#diff-8d484e99475c978743af82b7ee8ba45954457ecbac4c033744d1cc95ad9f8240R112
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]