Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 63d2358d3 -> 0e6357ff1
MINIFI-85: Add ListenSyslog processor This closes #6. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0e6357ff Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0e6357ff Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0e6357ff Branch: refs/heads/master Commit: 0e6357ff16ff3e329d8b85d0338ddf87b653d042 Parents: 63d2358 Author: Bin Qiu <[email protected]> Authored: Sat Aug 20 11:11:43 2016 -0700 Committer: Aldrin Piri <[email protected]> Committed: Thu Aug 25 14:45:26 2016 -0400 ---------------------------------------------------------------------- conf/flowListenSyslog.xml | 138 +++++++++++++++++ inc/FlowControlProtocol.h | 2 + inc/FlowController.h | 1 + inc/ListenSyslog.h | 209 +++++++++++++++++++++++++ src/FlowController.cpp | 4 + src/ListenSyslog.cpp | 345 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 699 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/conf/flowListenSyslog.xml ---------------------------------------------------------------------- diff --git a/conf/flowListenSyslog.xml b/conf/flowListenSyslog.xml new file mode 100644 index 0000000..8539bef --- /dev/null +++ b/conf/flowListenSyslog.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>291ee60c-0b91-4524-88c0-d71ee2498e02</id> + <name>ListenSyslog</name> + <position x="2489.369384765625" y="788.25244140625"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.ListenSyslog</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Protocol</name> + <value>UDP</value> + </property> + <property> + <name>Port</name> + <value>514</value> + </property> + <property> + <name>SSL Context Service</name> + </property> + <property> + <name>Receive Buffer Size</name> + <value>65507 B</value> + </property> + <property> + <name>Max Size of Socket Buffer</name> + <value>1 MB</value> + </property> + <property> + <name>Max Number of TCP Connections</name> + <value>2</value> + </property> + <property> + <name>Max Batch Size</name> + <value>10</value> + </property> + <property> + <name>Message Delimiter</name> + <value>\n</value> + </property> + <property> + <name>Parse Messages</name> + <value>false</value> + </property> + <property> + <name>Character Set</name> + <value>UTF-8</value> + </property> + <autoTerminatedRelationship>invalid</autoTerminatedRelationship> + </processor> + <processor> + <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id> + <name>LogAttribute</name> + <position x="3236.369384765625" y="830.25244140625"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Log Level</name> + <value>info</value> + </property> + <property> + <name>Log Payload</name> + <value>false</value> + </property> + <property> + <name>Attributes to Log</name> + </property> + <property> + <name>Attributes to Ignore</name> + </property> + <property> + <name>Log prefix</name> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <connection> + <id>c9e1cc50-2bc7-490d-9b5d-8c5dbc95a850</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>291ee60c-0b91-4524-88c0-d71ee2498e02</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/inc/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/inc/FlowControlProtocol.h b/inc/FlowControlProtocol.h index 24416f2..23f2d49 100644 --- a/inc/FlowControlProtocol.h +++ b/inc/FlowControlProtocol.h @@ -217,6 +217,8 @@ public: close(_socket); if (_reportBlob) delete [] _reportBlob; + if (this->_thread) + delete this->_thread; } public: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/inc/FlowController.h ---------------------------------------------------------------------- diff --git a/inc/FlowController.h b/inc/FlowController.h index 1d3b2f8..13f7dff 100644 --- a/inc/FlowController.h +++ b/inc/FlowController.h @@ -49,6 +49,7 @@ #include "RemoteProcessorGroupPort.h" #include "GetFile.h" #include "TailFile.h" +#include "ListenSyslog.h" //! Default NiFi Root Group Name #define DEFAULT_ROOT_GROUP_NAME "" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/inc/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/inc/ListenSyslog.h b/inc/ListenSyslog.h new file mode 100644 index 0000000..81bc92c --- /dev/null +++ b/inc/ListenSyslog.h @@ -0,0 +1,209 @@ +/** + * @file ListenSyslog.h + * ListenSyslog class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LISTEN_SYSLOG_H__ +#define __LISTEN_SYSLOG_H__ + +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <errno.h> +#include <sys/select.h> +#include <sys/time.h> +#include <sys/types.h> +#include <chrono> +#include <thread> +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! SyslogEvent +typedef struct { + uint8_t *payload; + uint64_t len; +} SysLogEvent; + +//! ListenSyslog Class +class ListenSyslog : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + ListenSyslog(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + _eventQueueByteSize = 0; + _serverSocket = 0; + _recvBufSize = 65507; + _maxSocketBufSize = 1024*1024; + _maxConnections = 2; + _maxBatchSize = 1; + _messageDelimiter = "\n"; + _protocol = "UDP"; + _port = 514; + _parseMessages = false; + _serverSocket = 0; + _maxFds = 0; + FD_ZERO(&_readfds); + _thread = NULL; + _resetServerSocket = false; + _serverTheadRunning = false; + } + //! Destructor + virtual ~ListenSyslog() + { + _serverTheadRunning = false; + if (this->_thread) + delete this->_thread; + // need to reset the socket + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) + { + int clientSocket = *it; + close(clientSocket); + } + _clientSockets.clear(); + if (_serverSocket > 0) + { + _logger->log_info("ListenSysLog Server socket %d close", _serverSocket); + close(_serverSocket); + _serverSocket = 0; + } + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property RecvBufSize; + static Property MaxSocketBufSize; + static Property MaxConnections; + static Property MaxBatchSize; + static Property MessageDelimiter; + static Property ParseMessages; + static Property Protocol; + static Property Port; + //! Supported Relationships + static Relationship Success; + static Relationship Invalid; + //! Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback + { + public: + WriteCallback(char *data, uint64_t size) + : _data(data), _dataSize(size) {} + char *_data; + uint64_t _dataSize; + void process(std::ofstream *stream) { + if (_data && _dataSize > 0) + stream->write(_data, _dataSize); + } + }; + +public: + //! OnTrigger method, implemented by NiFi ListenSyslog + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi ListenSyslog + virtual void initialize(void); + +protected: + +private: + //! Logger + Logger *_logger; + //! Run function for the thread + static void run(ListenSyslog *process); + //! Run Thread + void runThread(); + //! Queue for store syslog event + std::queue<SysLogEvent> _eventQueue; + //! Size of Event queue in bytes + uint64_t _eventQueueByteSize; + //! Get event queue size + uint64_t getEventQueueSize() { + std::lock_guard<std::mutex> lock(_mtx); + return _eventQueue.size(); + } + //! Get event queue byte size + uint64_t getEventQueueByteSize() { + std::lock_guard<std::mutex> lock(_mtx); + return _eventQueueByteSize; + } + //! Whether the event queue is empty + bool isEventQueueEmpty() + { + std::lock_guard<std::mutex> lock(_mtx); + return _eventQueue.empty(); + } + //! Put event into directory listing + void putEvent(uint8_t *payload, uint64_t len) + { + std::lock_guard<std::mutex> lock(_mtx); + SysLogEvent event; + event.payload = payload; + event.len = len; + _eventQueue.push(event); + _eventQueueByteSize += len; + } + //! Read \n terminated line from TCP socket + int readline( int fd, char *bufptr, size_t len ); + //! start server socket and handling client socket + void startSocketThread(); + //! Poll event + void pollEvent(std::queue<SysLogEvent> &list, int maxSize) + { + std::lock_guard<std::mutex> lock(_mtx); + + while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) + { + SysLogEvent event = _eventQueue.front(); + _eventQueue.pop(); + _eventQueueByteSize -= event.len; + list.push(event); + } + return; + } + //! Mutex for protection of the directory listing + std::mutex _mtx; + int64_t _recvBufSize; + int64_t _maxSocketBufSize; + int64_t _maxConnections; + int64_t _maxBatchSize; + std::string _messageDelimiter; + std::string _protocol; + int64_t _port; + bool _parseMessages; + int _serverSocket; + std::vector<int> _clientSockets; + int _maxFds; + fd_set _readfds; + //! thread + std::thread *_thread; + //! whether to reset the server socket + bool _resetServerSocket; + bool _serverTheadRunning; + //! buffer for read socket + uint8_t _buffer[2048]; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/src/FlowController.cpp b/src/FlowController.cpp index f53146e..c01c385 100644 --- a/src/FlowController.cpp +++ b/src/FlowController.cpp @@ -148,6 +148,10 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid) { processor = new TailFile(name, uuid); } + else if (name == ListenSyslog::ProcessorName) + { + processor = new ListenSyslog(name, uuid); + } else { _logger->log_error("No Processor defined for %s", name.c_str()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0e6357ff/src/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp new file mode 100644 index 0000000..090c988 --- /dev/null +++ b/src/ListenSyslog.cpp @@ -0,0 +1,345 @@ +/** + * @file ListenSyslog.cpp + * ListenSyslog class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <queue> +#include <stdio.h> +#include <string> +#include "TimeUtil.h" +#include "ListenSyslog.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string ListenSyslog::ProcessorName("ListenSyslog"); +Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B"); +Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB"); +Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2"); +Property ListenSyslog::MaxBatchSize("Max Batch Size", + "The maximum number of Syslog events to add to a single FlowFile.", "1"); +Property ListenSyslog::MessageDelimiter("Message Delimiter", + "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n"); +Property ListenSyslog::ParseMessages("Parse Messages", + "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false"); +Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP"); +Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514"); +Relationship ListenSyslog::Success("success", "All files are routed to success"); +Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); + +void ListenSyslog::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(RecvBufSize); + properties.insert(MaxSocketBufSize); + properties.insert(MaxConnections); + properties.insert(MaxBatchSize); + properties.insert(MessageDelimiter); + properties.insert(ParseMessages); + properties.insert(Protocol); + properties.insert(Port); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + relationships.insert(Invalid); + setSupportedRelationships(relationships); +} + +void ListenSyslog::startSocketThread() +{ + if (_thread != NULL) + return; + + _logger->log_info("ListenSysLog Socket Thread Start"); + _serverTheadRunning = true; + _thread = new std::thread(run, this); + _thread->detach(); +} + +void ListenSyslog::run(ListenSyslog *process) +{ + process->runThread(); +} + +void ListenSyslog::runThread() +{ + while (_serverTheadRunning) + { + if (_resetServerSocket) + { + _resetServerSocket = false; + // need to reset the socket + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) + { + int clientSocket = *it; + close(clientSocket); + } + _clientSockets.clear(); + if (_serverSocket > 0) + { + close(_serverSocket); + _serverSocket = 0; + } + } + + if (_serverSocket <= 0) + { + uint16_t portno = _port; + struct sockaddr_in serv_addr; + int sockfd; + if (_protocol == "TCP") + sockfd = socket(AF_INET, SOCK_STREAM, 0); + else + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) + { + _logger->log_info("ListenSysLog Server socket creation failed"); + break; + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(portno); + if (bind(sockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) + { + _logger->log_error("ListenSysLog Server socket bind failed"); + break; + } + if (_protocol == "TCP") + listen(sockfd,5); + _serverSocket = sockfd; + _logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); + } + FD_ZERO(&_readfds); + FD_SET(_serverSocket, &_readfds); + _maxFds = _serverSocket; + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) + { + int clientSocket = *it; + if (clientSocket >= _maxFds) + _maxFds = clientSocket; + FD_SET(clientSocket, &_readfds); + } + fd_set fds; + struct timeval tv; + int retval; + fds = _readfds; + tv.tv_sec = 0; + // 100 msec + tv.tv_usec = 100000; + retval = select(_maxFds+1, &fds, NULL, NULL, &tv); + if (retval < 0) + break; + if (retval == 0) + continue; + if (FD_ISSET(_serverSocket, &fds)) + { + // server socket, either we have UDP datagram or TCP connection request + if (_protocol == "TCP") + { + socklen_t clilen; + struct sockaddr_in cli_addr; + clilen = sizeof(cli_addr); + int newsockfd = accept(_serverSocket, + (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd > 0) + { + if (_clientSockets.size() < _maxConnections) + { + _clientSockets.push_back(newsockfd); + _logger->log_info("ListenSysLog new client socket %d connection", newsockfd); + continue; + } + else + { + close(newsockfd); + } + } + } + else + { + socklen_t clilen; + struct sockaddr_in cli_addr; + clilen = sizeof(cli_addr); + int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, + (struct sockaddr *)&cli_addr, &clilen); + if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) + { + uint8_t *payload = new uint8_t[recvlen]; + memcpy(payload, _buffer, recvlen); + putEvent(payload, recvlen); + } + } + } + it = _clientSockets.begin(); + while (it != _clientSockets.end()) + { + int clientSocket = *it; + if (FD_ISSET(clientSocket, &fds)) + { + int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer)); + if (recvlen <= 0) + { + close(clientSocket); + _logger->log_info("ListenSysLog client socket %d close", clientSocket); + it = _clientSockets.erase(it); + } + else + { + if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) + { + uint8_t *payload = new uint8_t[recvlen]; + memcpy(payload, _buffer, recvlen); + putEvent(payload, recvlen); + } + ++it; + } + } + } + } + return; +} + + +int ListenSyslog::readline( int fd, char *bufptr, size_t len ) +{ + char *bufx = bufptr; + static char *bp; + static int cnt = 0; + static char b[ 2048 ]; + char c; + + while ( --len > 0 ) + { + if ( --cnt <= 0 ) + { + cnt = recv( fd, b, sizeof( b ), 0 ); + if ( cnt < 0 ) + { + if ( errno == EINTR ) + { + len++; /* the while will decrement */ + continue; + } + return -1; + } + if ( cnt == 0 ) + return 0; + bp = b; + } + c = *bp++; + *bufptr++ = c; + if ( c == '\n' ) + { + *bufptr = '\n'; + return bufptr - bufx + 1; + } + } + return -1; +} + +void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + bool needResetServerSocket = false; + if (context->getProperty(Protocol.getName(), value)) + { + if (_protocol != value) + needResetServerSocket = true; + _protocol = value; + } + if (context->getProperty(RecvBufSize.getName(), value)) + { + Property::StringToInt(value, _recvBufSize); + } + if (context->getProperty(MaxSocketBufSize.getName(), value)) + { + Property::StringToInt(value, _maxSocketBufSize); + } + if (context->getProperty(MaxConnections.getName(), value)) + { + Property::StringToInt(value, _maxConnections); + } + if (context->getProperty(MessageDelimiter.getName(), value)) + { + _messageDelimiter = value; + } + if (context->getProperty(ParseMessages.getName(), value)) + { + Property::StringToBool(value, _parseMessages); + } + if (context->getProperty(Port.getName(), value)) + { + int64_t oldPort = _port; + Property::StringToInt(value, _port); + if (_port != oldPort) + needResetServerSocket = true; + } + if (context->getProperty(MaxBatchSize.getName(), value)) + { + Property::StringToInt(value, _maxBatchSize); + } + + if (needResetServerSocket) + _resetServerSocket = true; + + startSocketThread(); + + // read from the event queue + if (isEventQueueEmpty()) + { + context->yield(); + return; + } + + std::queue<SysLogEvent> eventQueue; + pollEvent(eventQueue, _maxBatchSize); + bool firstEvent = true; + FlowFileRecord *flowFile = NULL; + while(!eventQueue.empty()) + { + SysLogEvent event = eventQueue.front(); + eventQueue.pop(); + if (firstEvent) + { + flowFile = session->create(); + if (!flowFile) + return; + ListenSyslog::WriteCallback callback((char *)event.payload, event.len); + session->write(flowFile, &callback); + delete[] event.payload; + firstEvent = false; + } + else + { + /* + ListenSyslog::WriteCallback callbackSep((char *)_messageDelimiter.data(), _messageDelimiter.size()); + session->append(flowFile, &callbackSep); */ + ListenSyslog::WriteCallback callback((char *)event.payload, event.len); + session->append(flowFile, &callback); + delete[] event.payload; + } + } + flowFile->addAttribute("syslog.protocol", _protocol); + flowFile->addAttribute("syslog.port", std::to_string(_port)); + session->transfer(flowFile, Success); +}
