arpadboda commented on code in PR #1294:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1294#discussion_r849376987


##########
PROCESSORS.md:
##########
@@ -1002,27 +1002,52 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 ### Description
 
-Listens for Syslog messages being sent to a given port over TCP or UDP. 
Incoming messages are checked against regular expressions for RFC5424 and 
RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION 
)(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The timestamp can be 
an RFC5424 timestamp with a format of "yyyy-MM-dd'T'HH:mm:ss.SZ" or 
"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm", or it can be an RFC3164 timestamp with a 
format of "MMM d HH:mm:ss". If an incoming messages matches one of these 
patterns, the message will be parsed and the individual pieces will be placed 
in FlowFile attributes, with the original message in the content of the 
FlowFile. If an incoming message does not match one of these patterns it will 
not be parsed and the syslog.valid attribute will be set to false with the 
original message in the content of the FlowFile. Valid messages will be 
transferred on the success relationship, and invalid messages will be 
transferred on the invalid relat
 ionship.
+Listens for Syslog messages being sent to a given port over TCP or UDP.
+Incoming messages are optionally checked against regular expressions for 
RFC5424 and RFC3164 formatted messages.
+With parsing enabled the individual parts of the message will be placed as 
FlowFile attributes and valid messages will be transferred to success 
relationship, while invalid messages will be transferred to invalid 
relationship.
+With parsing disabled all message will be routed to the success relationship, 
but they will only contain the sender, protocol, and port attributes.
+
+
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description |
-| - | - | - | - |
-|Max Batch Size|1||The maximum number of Syslog events to add to a single 
FlowFile.|
-|Max Number of TCP Connections|2||The maximum number of concurrent connections 
to accept Syslog messages in TCP mode.|
-|Max Size of Socket Buffer|1 MB||The maximum size of the socket buffer that 
should be used.|
-|Message Delimiter|\n||Specifies the delimiter to place between Syslog 
messages when multiple messages are bundled together (see <Max Batch Size> 
core::Property).|
-|Parse Messages|false||Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only.|
-|Port|514||The port for Syslog communication|
-|Protocol|UDP|UDP<br>TCP<br>|The protocol for Syslog communication.|
-|Receive Buffer Size|65507 B||The size of each buffer used to receive Syslog 
messages.|
+| Name                      | Default Value | Allowable Values | Description   
                                                                                
                                                                                
                       |
+|---------------------------|---------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Listening Port            | 514           |                  | The port for 
Syslog communication.                                                           
                                                                                
                        |

Review Comment:
   I think a warning here would be useful about the required root priviliges in 
case of using the default port.



##########
extensions/standard-processors/processors/ListenSyslog.cpp:
##########
@@ -17,318 +14,272 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 #include "ListenSyslog.h"
-#include <stdio.h>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <queue>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "core/TypedValues.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
-core::Property ListenSyslog::RecvBufSize(
-    core::PropertyBuilder::createProperty("Receive Buffer 
Size")->withDescription("The size of each buffer used to receive Syslog 
messages.")->
-    withDefaultValue<core::DataSizeValue>("65507 B")->build());
-
-core::Property ListenSyslog::MaxSocketBufSize(
-    core::PropertyBuilder::createProperty("Max Size of Socket 
Buffer")->withDescription("The maximum size of the socket buffer that should be 
used.")->withDefaultValue<core::DataSizeValue>("1 MB")
-        ->build());
+namespace org::apache::nifi::minifi::processors {
 
-core::Property ListenSyslog::MaxConnections(
-    core::PropertyBuilder::createProperty("Max Number of TCP 
Connections")->withDescription("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
-        ->withDefaultValue<int>(2)->build());
+const core::Property ListenSyslog::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port for Syslog communication.")
+        ->isRequired(true)
+        ->withDefaultValue<int>(514, 
core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
 
-core::Property ListenSyslog::MaxBatchSize(
-    core::PropertyBuilder::createProperty("Max Batch 
Size")->withDescription("The maximum number of Syslog events to add to a single 
FlowFile.")->withDefaultValue<int>(1)->build());
+const core::Property ListenSyslog::ProtocolProperty(
+    core::PropertyBuilder::createProperty("Protocol")
+        ->withDescription("The protocol for Syslog communication.")
+        ->isRequired(true)
+        ->withAllowableValues(Protocol::values())
+        ->withDefaultValue(toString(Protocol::UDP))
+        ->build());
 
-core::Property ListenSyslog::MessageDelimiter(
-    core::PropertyBuilder::createProperty("Message 
Delimiter")->withDescription("Specifies the delimiter to place between Syslog 
messages when multiple "
-                                                                               
 "messages are bundled together (see <Max Batch Size> 
core::Property).")->withDefaultValue("\n")->build());
+const core::Property ListenSyslog::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of Syslog events to process at a 
time.")
+        ->withDefaultValue<uint64_t>(500, 
std::make_shared<core::UnsignedLongValidator>("Greater or equal than 1 
validator", 1))
+        ->build());
 
-core::Property ListenSyslog::ParseMessages(
-    core::PropertyBuilder::createProperty("Parse 
Messages")->withDescription("Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only.")
+const core::Property ListenSyslog::ParseMessages(
+    core::PropertyBuilder::createProperty("Parse Messages")
+        ->withDescription("Indicates if the processor should parse the Syslog 
messages. "
+                          "If set to false, each outgoing FlowFile will only 
contain the sender, protocol, and port, and no additional attributes.")
         ->withDefaultValue<bool>(false)->build());
 
-core::Property ListenSyslog::Protocol(
-    core::PropertyBuilder::createProperty("Protocol")->withDescription("The 
protocol for Syslog 
communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue(
-        "UDP")->build());
+const core::Property ListenSyslog::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of Syslog messages allowed to be 
buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the message is ignored. If set 
to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(0)->build());
+
+const core::Relationship ListenSyslog::Success("success", "Incoming messages 
that match the expected format when parsing will be sent to this relationship. "
+                                                          "When Parse Messages 
is set to false, all incoming message will be sent to this relationship.");
+const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages 
that do not match the expected format when parsing will be sent to this 
relationship.");
+
 
-core::Property ListenSyslog::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port 
for Syslog communication")->withDefaultValue<int64_t>(514, 
core::StandardValidators::get().PORT_VALIDATOR)->build());
+const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
+    R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)"                                     
                               // priority
+    R"((?:(\d{1,2}))\s)"                                                       
                               // version
+    
R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)"
        // timestamp
+    R"((?:([\S]{1,255}))\s)"                                                   
                               // hostname
+    R"((?:([\S]{1,48}))\s)"                                                    
                               // app_name
+    R"((?:([\S]{1,128}))\s)"                                                   
                               // proc_id
+    R"((?:([\S]{1,32}))\s)"                                                    
                               // msg_id
+    R"((?:(-|(?:\[.+?\])+))\s?)"                                               
                               // structured_data
+    R"((?:((?:.+)))?$)", std::regex::ECMAScript);                              
                               // msg
 
-core::Relationship ListenSyslog::Success("success", "All files are routed to 
success");
-core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format 
invalid");
+const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_(
+    R"((?:\<(\d{1,3})\>))"                                                     
                               // priority
+    R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)"              
                               // timestamp
+    R"(([\w][\w\d(\.|\:)@-]*)\s)"                                              
                               // hostname
+    R"((.*)$)", std::regex::ECMAScript);                                       
                               // msg
 
 void ListenSyslog::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(RecvBufSize);
-  properties.insert(MaxSocketBufSize);
-  properties.insert(MaxConnections);
-  properties.insert(MaxBatchSize);
-  properties.insert(MessageDelimiter);
-  properties.insert(ParseMessages);
-  properties.insert(Protocol);
-  properties.insert(Port);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Invalid);
-  setSupportedRelationships(relationships);
+  setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, 
MaxQueueSize});
+  setSupportedRelationships({Success, Invalid});
 }
 
-void ListenSyslog::startSocketThread() {
-  if (_thread != NULL)
-    return;
+void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context && !server_thread_.joinable() && !server_);
 
-  logger_->log_trace("ListenSysLog Socket Thread Start");
-  _serverTheadRunning = true;
-  _thread = new std::thread(run, this);
-  _thread->detach();
-}
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  context->getProperty(ParseMessages.getName(), parse_messages_);
 
-void ListenSyslog::run(ListenSyslog *process) {
-  process->runThread();
-}
+  uint64_t max_queue_size = 0;
+  context->getProperty(MaxQueueSize.getName(), max_queue_size);
+  max_queue_size_ = max_queue_size > 0 ? 
std::optional<uint64_t>(max_queue_size) : std::nullopt;
 
-void ListenSyslog::runThread() {
-  while (_serverTheadRunning) {
-    if (_resetServerSocket) {
-      _resetServerSocket = false;
-      // need to reset the socket
-      std::vector<int>::iterator it;
-      for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-        int clientSocket = *it;
-        close(clientSocket);
-      }
-      _clientSockets.clear();
-      if (_serverSocket > 0) {
-        close(_serverSocket);
-        _serverSocket = 0;
-      }
-    }
+  Protocol protocol = Protocol(ProtocolProperty.getDefaultValue());
+  if (!context->getProperty(ProtocolProperty.getName(), protocol)) {
+    logger_->log_error("Missing or invalid Protocol: defaulting to %s", 
protocol.toString());

Review Comment:
   Not sure if this can occur anyhow given the validator, but in case it does, 
it should throw



##########
extensions/standard-processors/processors/ListenSyslog.cpp:
##########
@@ -17,318 +14,272 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 #include "ListenSyslog.h"
-#include <stdio.h>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <queue>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "core/TypedValues.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
-core::Property ListenSyslog::RecvBufSize(
-    core::PropertyBuilder::createProperty("Receive Buffer 
Size")->withDescription("The size of each buffer used to receive Syslog 
messages.")->
-    withDefaultValue<core::DataSizeValue>("65507 B")->build());
-
-core::Property ListenSyslog::MaxSocketBufSize(
-    core::PropertyBuilder::createProperty("Max Size of Socket 
Buffer")->withDescription("The maximum size of the socket buffer that should be 
used.")->withDefaultValue<core::DataSizeValue>("1 MB")
-        ->build());
+namespace org::apache::nifi::minifi::processors {
 
-core::Property ListenSyslog::MaxConnections(
-    core::PropertyBuilder::createProperty("Max Number of TCP 
Connections")->withDescription("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
-        ->withDefaultValue<int>(2)->build());
+const core::Property ListenSyslog::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port for Syslog communication.")
+        ->isRequired(true)
+        ->withDefaultValue<int>(514, 
core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
 
-core::Property ListenSyslog::MaxBatchSize(
-    core::PropertyBuilder::createProperty("Max Batch 
Size")->withDescription("The maximum number of Syslog events to add to a single 
FlowFile.")->withDefaultValue<int>(1)->build());
+const core::Property ListenSyslog::ProtocolProperty(
+    core::PropertyBuilder::createProperty("Protocol")
+        ->withDescription("The protocol for Syslog communication.")
+        ->isRequired(true)
+        ->withAllowableValues(Protocol::values())
+        ->withDefaultValue(toString(Protocol::UDP))
+        ->build());
 
-core::Property ListenSyslog::MessageDelimiter(
-    core::PropertyBuilder::createProperty("Message 
Delimiter")->withDescription("Specifies the delimiter to place between Syslog 
messages when multiple "
-                                                                               
 "messages are bundled together (see <Max Batch Size> 
core::Property).")->withDefaultValue("\n")->build());
+const core::Property ListenSyslog::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of Syslog events to process at a 
time.")
+        ->withDefaultValue<uint64_t>(500, 
std::make_shared<core::UnsignedLongValidator>("Greater or equal than 1 
validator", 1))
+        ->build());
 
-core::Property ListenSyslog::ParseMessages(
-    core::PropertyBuilder::createProperty("Parse 
Messages")->withDescription("Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only.")
+const core::Property ListenSyslog::ParseMessages(
+    core::PropertyBuilder::createProperty("Parse Messages")
+        ->withDescription("Indicates if the processor should parse the Syslog 
messages. "
+                          "If set to false, each outgoing FlowFile will only 
contain the sender, protocol, and port, and no additional attributes.")
         ->withDefaultValue<bool>(false)->build());
 
-core::Property ListenSyslog::Protocol(
-    core::PropertyBuilder::createProperty("Protocol")->withDescription("The 
protocol for Syslog 
communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue(
-        "UDP")->build());
+const core::Property ListenSyslog::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of Syslog messages allowed to be 
buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the message is ignored. If set 
to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(0)->build());
+
+const core::Relationship ListenSyslog::Success("success", "Incoming messages 
that match the expected format when parsing will be sent to this relationship. "
+                                                          "When Parse Messages 
is set to false, all incoming message will be sent to this relationship.");
+const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages 
that do not match the expected format when parsing will be sent to this 
relationship.");
+
 
-core::Property ListenSyslog::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port 
for Syslog communication")->withDefaultValue<int64_t>(514, 
core::StandardValidators::get().PORT_VALIDATOR)->build());
+const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
+    R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)"                                     
                               // priority
+    R"((?:(\d{1,2}))\s)"                                                       
                               // version
+    
R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)"
        // timestamp
+    R"((?:([\S]{1,255}))\s)"                                                   
                               // hostname
+    R"((?:([\S]{1,48}))\s)"                                                    
                               // app_name
+    R"((?:([\S]{1,128}))\s)"                                                   
                               // proc_id
+    R"((?:([\S]{1,32}))\s)"                                                    
                               // msg_id
+    R"((?:(-|(?:\[.+?\])+))\s?)"                                               
                               // structured_data
+    R"((?:((?:.+)))?$)", std::regex::ECMAScript);                              
                               // msg
 
-core::Relationship ListenSyslog::Success("success", "All files are routed to 
success");
-core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format 
invalid");
+const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_(
+    R"((?:\<(\d{1,3})\>))"                                                     
                               // priority
+    R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)"              
                               // timestamp
+    R"(([\w][\w\d(\.|\:)@-]*)\s)"                                              
                               // hostname
+    R"((.*)$)", std::regex::ECMAScript);                                       
                               // msg
 
 void ListenSyslog::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(RecvBufSize);
-  properties.insert(MaxSocketBufSize);
-  properties.insert(MaxConnections);
-  properties.insert(MaxBatchSize);
-  properties.insert(MessageDelimiter);
-  properties.insert(ParseMessages);
-  properties.insert(Protocol);
-  properties.insert(Port);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Invalid);
-  setSupportedRelationships(relationships);
+  setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, 
MaxQueueSize});
+  setSupportedRelationships({Success, Invalid});
 }
 
-void ListenSyslog::startSocketThread() {
-  if (_thread != NULL)
-    return;
+void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context && !server_thread_.joinable() && !server_);
 
-  logger_->log_trace("ListenSysLog Socket Thread Start");
-  _serverTheadRunning = true;
-  _thread = new std::thread(run, this);
-  _thread->detach();
-}
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  context->getProperty(ParseMessages.getName(), parse_messages_);
 
-void ListenSyslog::run(ListenSyslog *process) {
-  process->runThread();
-}
+  uint64_t max_queue_size = 0;
+  context->getProperty(MaxQueueSize.getName(), max_queue_size);
+  max_queue_size_ = max_queue_size > 0 ? 
std::optional<uint64_t>(max_queue_size) : std::nullopt;
 
-void ListenSyslog::runThread() {
-  while (_serverTheadRunning) {
-    if (_resetServerSocket) {
-      _resetServerSocket = false;
-      // need to reset the socket
-      std::vector<int>::iterator it;
-      for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-        int clientSocket = *it;
-        close(clientSocket);
-      }
-      _clientSockets.clear();
-      if (_serverSocket > 0) {
-        close(_serverSocket);
-        _serverSocket = 0;
-      }
-    }
+  Protocol protocol = Protocol(ProtocolProperty.getDefaultValue());
+  if (!context->getProperty(ProtocolProperty.getName(), protocol)) {
+    logger_->log_error("Missing or invalid Protocol: defaulting to %s", 
protocol.toString());
+  }
 
-    if (_serverSocket <= 0) {
-      uint16_t portno = _port;
-      struct sockaddr_in serv_addr;
-      int sockfd;
-      if (_protocol == "TCP")
-        sockfd = socket(AF_INET, SOCK_STREAM, 0);
-      else
-        sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-      if (sockfd < 0) {
-        logger_->log_error("ListenSysLog Server socket creation failed");
-        break;
-      }
-      bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr));
-      serv_addr.sin_family = AF_INET;
-      serv_addr.sin_addr.s_addr = INADDR_ANY;
-      serv_addr.sin_port = htons(portno);
-      if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) 
{
-        logger_->log_error("ListenSysLog Server socket bind failed");
-        break;
-      }
-      if (_protocol == "TCP")
-        listen(sockfd, 5);
-      _serverSocket = sockfd;
-      logger_->log_info("ListenSysLog Server socket %d bind OK to port %d", 
_serverSocket, portno);
-    }
-    FD_ZERO(&_readfds);
-    FD_SET(_serverSocket, &_readfds);
-    _maxFds = _serverSocket;
-    std::vector<int>::iterator it;
-    for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-      int clientSocket = *it;
-      if (clientSocket >= _maxFds)
-        _maxFds = clientSocket;
-      FD_SET(clientSocket, &_readfds);
-    }
-    fd_set fds;
-    struct timeval tv;
-    int retval;
-    fds = _readfds;
-    tv.tv_sec = 0;
-    // 100 msec
-    tv.tv_usec = 100000;
-    retval = select(_maxFds + 1, &fds, NULL, NULL, &tv);
-    if (retval < 0)
+  int port = Port.getDefaultValue();
+  if (!context->getProperty(Port.getName(), port)) {
+    logger_->log_error("Missing or invalid Port: defaulting to %s", port);

Review Comment:
   The same concerns here as in case of the protocol 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to