Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 63d2358d3 -> 0e6357ff1


MINIFI-85: Add ListenSyslog processor

This closes #6.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0e6357ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0e6357ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0e6357ff

Branch: refs/heads/master
Commit: 0e6357ff16ff3e329d8b85d0338ddf87b653d042
Parents: 63d2358
Author: Bin Qiu <[email protected]>
Authored: Sat Aug 20 11:11:43 2016 -0700
Committer: Aldrin Piri <[email protected]>
Committed: Thu Aug 25 14:45:26 2016 -0400

----------------------------------------------------------------------
 conf/flowListenSyslog.xml | 138 +++++++++++++++++
 inc/FlowControlProtocol.h |   2 +
 inc/FlowController.h      |   1 +
 inc/ListenSyslog.h        | 209 +++++++++++++++++++++++++
 src/FlowController.cpp    |   4 +
 src/ListenSyslog.cpp      | 345 +++++++++++++++++++++++++++++++++++++++++
 6 files changed, 699 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/conf/flowListenSyslog.xml
----------------------------------------------------------------------
diff --git a/conf/flowListenSyslog.xml b/conf/flowListenSyslog.xml
new file mode 100644
index 0000000..8539bef
--- /dev/null
+++ b/conf/flowListenSyslog.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>291ee60c-0b91-4524-88c0-d71ee2498e02</id>
+      <name>ListenSyslog</name>
+      <position x="2489.369384765625" y="788.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.ListenSyslog</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Protocol</name>
+        <value>UDP</value>
+      </property>
+      <property>
+        <name>Port</name>
+        <value>514</value>
+      </property>
+      <property>
+        <name>SSL Context Service</name>
+      </property>
+      <property>
+        <name>Receive Buffer Size</name>
+        <value>65507 B</value>
+      </property>
+      <property>
+        <name>Max Size of Socket Buffer</name>
+        <value>1 MB</value>
+      </property>
+      <property>
+        <name>Max Number of TCP Connections</name>
+        <value>2</value>
+      </property>
+      <property>
+        <name>Max Batch Size</name>
+        <value>10</value>
+      </property>
+      <property>
+        <name>Message Delimiter</name>
+        <value>\n</value>
+      </property>
+      <property>
+        <name>Parse Messages</name>
+        <value>false</value>
+      </property>
+      <property>
+        <name>Character Set</name>
+        <value>UTF-8</value>
+      </property>
+      <autoTerminatedRelationship>invalid</autoTerminatedRelationship>
+    </processor>
+    <processor>
+      <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id>
+      <name>LogAttribute</name>
+      <position x="3236.369384765625" y="830.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Log Level</name>
+        <value>info</value>
+      </property>
+      <property>
+        <name>Log Payload</name>
+        <value>false</value>
+      </property>
+      <property>
+        <name>Attributes to Log</name>
+      </property>
+      <property>
+        <name>Attributes to Ignore</name>
+      </property>
+      <property>
+        <name>Log prefix</name>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <connection>
+      <id>c9e1cc50-2bc7-490d-9b5d-8c5dbc95a850</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>291ee60c-0b91-4524-88c0-d71ee2498e02</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/inc/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/inc/FlowControlProtocol.h b/inc/FlowControlProtocol.h
index 24416f2..23f2d49 100644
--- a/inc/FlowControlProtocol.h
+++ b/inc/FlowControlProtocol.h
@@ -217,6 +217,8 @@ public:
                        close(_socket);
                if (_reportBlob)
                        delete [] _reportBlob;
+               if (this->_thread)
+                       delete this->_thread;
        }
 
 public:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/inc/FlowController.h
----------------------------------------------------------------------
diff --git a/inc/FlowController.h b/inc/FlowController.h
index 1d3b2f8..13f7dff 100644
--- a/inc/FlowController.h
+++ b/inc/FlowController.h
@@ -49,6 +49,7 @@
 #include "RemoteProcessorGroupPort.h"
 #include "GetFile.h"
 #include "TailFile.h"
+#include "ListenSyslog.h"
 
 //! Default NiFi Root Group Name
 #define DEFAULT_ROOT_GROUP_NAME ""

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/inc/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/inc/ListenSyslog.h b/inc/ListenSyslog.h
new file mode 100644
index 0000000..81bc92c
--- /dev/null
+++ b/inc/ListenSyslog.h
@@ -0,0 +1,209 @@
+/**
+ * @file ListenSyslog.h
+ * ListenSyslog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_SYSLOG_H__
+#define __LISTEN_SYSLOG_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <chrono>
+#include <thread>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! SyslogEvent
+typedef struct {
+       uint8_t *payload;
+       uint64_t len;
+} SysLogEvent;
+
+//! ListenSyslog Class
+class ListenSyslog : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       ListenSyslog(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+               _eventQueueByteSize = 0;
+               _serverSocket = 0;
+               _recvBufSize = 65507;
+               _maxSocketBufSize = 1024*1024;
+               _maxConnections = 2;
+               _maxBatchSize = 1;
+               _messageDelimiter = "\n";
+               _protocol = "UDP";
+               _port = 514;
+               _parseMessages = false;
+               _serverSocket = 0;
+               _maxFds = 0;
+               FD_ZERO(&_readfds);
+               _thread = NULL;
+               _resetServerSocket = false;
+               _serverTheadRunning = false;
+       }
+       //! Destructor
+       virtual ~ListenSyslog()
+       {
+               _serverTheadRunning = false;
+               if (this->_thread)
+                       delete this->_thread;
+               // need to reset the socket
+               std::vector<int>::iterator it;
+               for (it = _clientSockets.begin(); it != _clientSockets.end(); 
++it)
+               {
+                       int clientSocket = *it;
+                       close(clientSocket);
+               }
+               _clientSockets.clear();
+               if (_serverSocket > 0)
+               {
+                       _logger->log_info("ListenSysLog Server socket %d 
close", _serverSocket);
+                       close(_serverSocket);
+                       _serverSocket = 0;
+               }
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property RecvBufSize;
+       static Property MaxSocketBufSize;
+       static Property MaxConnections;
+       static Property MaxBatchSize;
+       static Property MessageDelimiter;
+       static Property ParseMessages;
+       static Property Protocol;
+       static Property Port;
+       //! Supported Relationships
+       static Relationship Success;
+       static Relationship Invalid;
+       //! Nest Callback Class for write stream
+       class WriteCallback : public OutputStreamCallback
+       {
+               public:
+               WriteCallback(char *data, uint64_t size)
+               : _data(data), _dataSize(size) {}
+               char *_data;
+               uint64_t _dataSize;
+               void process(std::ofstream *stream) {
+                       if (_data && _dataSize > 0)
+                               stream->write(_data, _dataSize);
+               }
+       };
+
+public:
+       //! OnTrigger method, implemented by NiFi ListenSyslog
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi ListenSyslog
+       virtual void initialize(void);
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+       //! Run function for the thread
+       static void run(ListenSyslog *process);
+       //! Run Thread
+       void runThread();
+       //! Queue for store syslog event
+       std::queue<SysLogEvent> _eventQueue;
+       //! Size of Event queue in bytes
+       uint64_t _eventQueueByteSize;
+       //! Get event queue size
+       uint64_t getEventQueueSize() {
+               std::lock_guard<std::mutex> lock(_mtx);
+               return _eventQueue.size();
+       }
+       //! Get event queue byte size
+       uint64_t getEventQueueByteSize() {
+               std::lock_guard<std::mutex> lock(_mtx);
+               return _eventQueueByteSize;
+       }
+       //! Whether the event queue  is empty
+       bool isEventQueueEmpty()
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               return _eventQueue.empty();
+       }
+       //! Put event into directory listing
+       void putEvent(uint8_t *payload, uint64_t len)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               SysLogEvent event;
+               event.payload = payload;
+               event.len = len;
+               _eventQueue.push(event);
+               _eventQueueByteSize += len;
+       }
+       //! Read \n terminated line from TCP socket
+       int readline( int fd, char *bufptr, size_t len );
+       //! start server socket and handling client socket
+       void startSocketThread();
+       //! Poll event
+       void pollEvent(std::queue<SysLogEvent> &list, int maxSize)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+
+               while (!_eventQueue.empty() && (maxSize == 0 || list.size() < 
maxSize))
+               {
+                       SysLogEvent event = _eventQueue.front();
+                       _eventQueue.pop();
+                       _eventQueueByteSize -= event.len;
+                       list.push(event);
+               }
+               return;
+       }
+       //! Mutex for protection of the directory listing
+       std::mutex _mtx;
+       int64_t _recvBufSize;
+       int64_t _maxSocketBufSize;
+       int64_t _maxConnections;
+       int64_t _maxBatchSize;
+       std::string _messageDelimiter;
+       std::string _protocol;
+       int64_t _port;
+       bool _parseMessages;
+       int _serverSocket;
+       std::vector<int> _clientSockets;
+       int _maxFds;
+       fd_set _readfds;
+       //! thread
+       std::thread *_thread;
+       //! whether to reset the server socket
+       bool _resetServerSocket;
+       bool _serverTheadRunning;
+       //! buffer for read socket
+       uint8_t _buffer[2048];
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index f53146e..c01c385 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -148,6 +148,10 @@ Processor *FlowController::createProcessor(std::string 
name, uuid_t uuid)
        {
                processor = new TailFile(name, uuid);
        }
+       else if (name == ListenSyslog::ProcessorName)
+       {
+               processor = new ListenSyslog(name, uuid);
+       }
        else
        {
                _logger->log_error("No Processor defined for %s", name.c_str());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/src/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp
new file mode 100644
index 0000000..090c988
--- /dev/null
+++ b/src/ListenSyslog.cpp
@@ -0,0 +1,345 @@
+/**
+ * @file ListenSyslog.cpp
+ * ListenSyslog class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <queue>
+#include <stdio.h>
+#include <string>
+#include "TimeUtil.h"
+#include "ListenSyslog.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ListenSyslog::ProcessorName("ListenSyslog");
+Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each 
buffer used to receive Syslog messages.", "65507 B");
+Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The 
maximum size of the socket buffer that should be used.", "1 MB");
+Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The 
maximum number of concurrent connections to accept Syslog messages in TCP 
mode.", "2");
+Property ListenSyslog::MaxBatchSize("Max Batch Size",
+               "The maximum number of Syslog events to add to a single 
FlowFile.", "1");
+Property ListenSyslog::MessageDelimiter("Message Delimiter",
+               "Specifies the delimiter to place between Syslog messages when 
multiple messages are bundled together (see <Max Batch Size> property).", "\n");
+Property ListenSyslog::ParseMessages("Parse Messages",
+               "Indicates if the processor should parse the Syslog messages. 
If set to false, each outgoing FlowFile will only.", "false");
+Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog 
communication.", "UDP");
+Property ListenSyslog::Port("Port", "The port for Syslog communication.", 
"514");
+Relationship ListenSyslog::Success("success", "All files are routed to 
success");
+Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
+
+void ListenSyslog::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(RecvBufSize);
+       properties.insert(MaxSocketBufSize);
+       properties.insert(MaxConnections);
+       properties.insert(MaxBatchSize);
+       properties.insert(MessageDelimiter);
+       properties.insert(ParseMessages);
+       properties.insert(Protocol);
+       properties.insert(Port);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       relationships.insert(Invalid);
+       setSupportedRelationships(relationships);
+}
+
+void ListenSyslog::startSocketThread()
+{
+       if (_thread != NULL)
+               return;
+
+       _logger->log_info("ListenSysLog Socket Thread Start");
+       _serverTheadRunning = true;
+       _thread = new std::thread(run, this);
+       _thread->detach();
+}
+
+void ListenSyslog::run(ListenSyslog *process)
+{
+       process->runThread();
+}
+
+void ListenSyslog::runThread()
+{
+       while (_serverTheadRunning)
+       {
+               if (_resetServerSocket)
+               {
+                       _resetServerSocket = false;
+                       // need to reset the socket
+                       std::vector<int>::iterator it;
+                       for (it = _clientSockets.begin(); it != 
_clientSockets.end(); ++it)
+                       {
+                               int clientSocket = *it;
+                               close(clientSocket);
+                       }
+                       _clientSockets.clear();
+                       if (_serverSocket > 0)
+                       {
+                               close(_serverSocket);
+                               _serverSocket = 0;
+                       }
+               }
+
+               if (_serverSocket <= 0)
+               {
+                       uint16_t portno = _port;
+                       struct sockaddr_in serv_addr;
+                       int sockfd;
+                       if (_protocol == "TCP")
+                               sockfd = socket(AF_INET, SOCK_STREAM, 0);
+                       else
+                               sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+                       if (sockfd < 0)
+                       {
+                               _logger->log_info("ListenSysLog Server socket 
creation failed");
+                               break;
+                       }
+                       bzero((char *) &serv_addr, sizeof(serv_addr));
+                       serv_addr.sin_family = AF_INET;
+                       serv_addr.sin_addr.s_addr = INADDR_ANY;
+                       serv_addr.sin_port = htons(portno);
+                       if (bind(sockfd, (struct sockaddr *) &serv_addr,
+                                       sizeof(serv_addr)) < 0)
+                       {
+                               _logger->log_error("ListenSysLog Server socket 
bind failed");
+                               break;
+                       }
+                       if (_protocol == "TCP")
+                               listen(sockfd,5);
+                       _serverSocket = sockfd;
+                       _logger->log_error("ListenSysLog Server socket %d bind 
OK to port %d", _serverSocket, portno);
+               }
+               FD_ZERO(&_readfds);
+               FD_SET(_serverSocket, &_readfds);
+               _maxFds = _serverSocket;
+               std::vector<int>::iterator it;
+               for (it = _clientSockets.begin(); it != _clientSockets.end(); 
++it)
+               {
+                       int clientSocket = *it;
+                       if (clientSocket >= _maxFds)
+                               _maxFds = clientSocket;
+                       FD_SET(clientSocket, &_readfds);
+               }
+               fd_set fds;
+               struct timeval tv;
+               int retval;
+               fds = _readfds;
+               tv.tv_sec = 0;
+               // 100 msec
+               tv.tv_usec = 100000;
+               retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
+               if (retval < 0)
+                       break;
+               if (retval == 0)
+                       continue;
+               if (FD_ISSET(_serverSocket, &fds))
+               {
+                       // server socket, either we have UDP datagram or TCP 
connection request
+                       if (_protocol == "TCP")
+                       {
+                               socklen_t clilen;
+                               struct sockaddr_in cli_addr;
+                               clilen = sizeof(cli_addr);
+                               int newsockfd = accept(_serverSocket,
+                                               (struct sockaddr *) &cli_addr,
+                                               &clilen);
+                               if (newsockfd > 0)
+                               {
+                                       if (_clientSockets.size() < 
_maxConnections)
+                                       {
+                                               
_clientSockets.push_back(newsockfd);
+                                               _logger->log_info("ListenSysLog 
new client socket %d connection", newsockfd);
+                                               continue;
+                                       }
+                                       else
+                                       {
+                                               close(newsockfd);
+                                       }
+                               }
+                       }
+                       else
+                       {
+                               socklen_t clilen;
+                               struct sockaddr_in cli_addr;
+                               clilen = sizeof(cli_addr);
+                               int recvlen = recvfrom(_serverSocket, _buffer, 
sizeof(_buffer), 0,
+                                               (struct sockaddr *)&cli_addr, 
&clilen);
+                               if (recvlen > 0 && (recvlen + 
getEventQueueByteSize()) <= _recvBufSize)
+                               {
+                                       uint8_t *payload = new uint8_t[recvlen];
+                                       memcpy(payload, _buffer, recvlen);
+                                       putEvent(payload, recvlen);
+                               }
+                       }
+               }
+               it = _clientSockets.begin();
+               while (it != _clientSockets.end())
+               {
+                       int clientSocket = *it;
+                       if (FD_ISSET(clientSocket, &fds))
+                       {
+                               int recvlen = readline(clientSocket, (char 
*)_buffer, sizeof(_buffer));
+                               if (recvlen <= 0)
+                               {
+                                       close(clientSocket);
+                                       _logger->log_info("ListenSysLog client 
socket %d close", clientSocket);
+                                       it = _clientSockets.erase(it);
+                               }
+                               else
+                               {
+                                       if ((recvlen + getEventQueueByteSize()) 
<= _recvBufSize)
+                                       {
+                                               uint8_t *payload = new 
uint8_t[recvlen];
+                                               memcpy(payload, _buffer, 
recvlen);
+                                               putEvent(payload, recvlen);
+                                       }
+                                       ++it;
+                               }
+                       }
+               }
+       }
+       return;
+}
+
+
+int ListenSyslog::readline( int fd, char *bufptr, size_t len )
+{
+       char *bufx = bufptr;
+       static char *bp;
+       static int cnt = 0;
+       static char b[ 2048 ];
+       char c;
+
+       while ( --len > 0 )
+    {
+      if ( --cnt <= 0 )
+      {
+         cnt = recv( fd, b, sizeof( b ), 0 );
+         if ( cnt < 0 )
+         {
+                 if ( errno == EINTR )
+                 {
+                         len++;                /* the while will decrement */
+                         continue;
+                 }
+                 return -1;
+         }
+         if ( cnt == 0 )
+                 return 0;
+         bp = b;
+      }
+      c = *bp++;
+      *bufptr++ = c;
+      if ( c == '\n' )
+      {
+         *bufptr = '\n';
+         return bufptr - bufx + 1;
+      }
+    }
+       return -1;
+}
+
+void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string value;
+       bool needResetServerSocket = false;
+       if (context->getProperty(Protocol.getName(), value))
+       {
+               if (_protocol != value)
+                       needResetServerSocket = true;
+               _protocol = value;
+       }
+       if (context->getProperty(RecvBufSize.getName(), value))
+       {
+               Property::StringToInt(value, _recvBufSize);
+       }
+       if (context->getProperty(MaxSocketBufSize.getName(), value))
+       {
+               Property::StringToInt(value, _maxSocketBufSize);
+       }
+       if (context->getProperty(MaxConnections.getName(), value))
+       {
+               Property::StringToInt(value, _maxConnections);
+       }
+       if (context->getProperty(MessageDelimiter.getName(), value))
+       {
+               _messageDelimiter = value;
+       }
+       if (context->getProperty(ParseMessages.getName(), value))
+       {
+               Property::StringToBool(value, _parseMessages);
+       }
+       if (context->getProperty(Port.getName(), value))
+       {
+               int64_t oldPort = _port;
+               Property::StringToInt(value, _port);
+               if (_port != oldPort)
+                       needResetServerSocket = true;
+       }
+       if (context->getProperty(MaxBatchSize.getName(), value))
+       {
+               Property::StringToInt(value, _maxBatchSize);
+       }
+
+       if (needResetServerSocket)
+               _resetServerSocket = true;
+
+       startSocketThread();
+
+       // read from the event queue
+       if (isEventQueueEmpty())
+       {
+               context->yield();
+               return;
+       }
+
+       std::queue<SysLogEvent> eventQueue;
+       pollEvent(eventQueue, _maxBatchSize);
+       bool firstEvent = true;
+       FlowFileRecord *flowFile = NULL;
+       while(!eventQueue.empty())
+       {
+               SysLogEvent event = eventQueue.front();
+               eventQueue.pop();
+               if (firstEvent)
+               {
+                       flowFile = session->create();
+                       if (!flowFile)
+                               return;
+                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
+                       session->write(flowFile, &callback);
+                       delete[] event.payload;
+                       firstEvent = false;
+               }
+               else
+               {
+                       /*
+                       ListenSyslog::WriteCallback callbackSep((char 
*)_messageDelimiter.data(), _messageDelimiter.size());
+                       session->append(flowFile, &callbackSep); */
+                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
+                       session->append(flowFile, &callback);
+                       delete[] event.payload;
+               }
+       }
+       flowFile->addAttribute("syslog.protocol", _protocol);
+       flowFile->addAttribute("syslog.port", std::to_string(_port));
+       session->transfer(flowFile, Success);
+}

Reply via email to