http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ListenSyslog.cpp b/libminifi/src/ListenSyslog.cpp new file mode 100644 index 0000000..ace37d7 --- /dev/null +++ b/libminifi/src/ListenSyslog.cpp @@ -0,0 +1,342 @@ +/** + * @file ListenSyslog.cpp + * ListenSyslog class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <queue> +#include <stdio.h> +#include <string> +#include "TimeUtil.h" +#include "ListenSyslog.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string ListenSyslog::ProcessorName("ListenSyslog"); +Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B"); +Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB"); +Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2"); +Property ListenSyslog::MaxBatchSize("Max Batch Size", + "The maximum number of Syslog events to add to a single FlowFile.", "1"); +Property ListenSyslog::MessageDelimiter("Message Delimiter", + "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n"); +Property ListenSyslog::ParseMessages("Parse Messages", + "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false"); +Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP"); +Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514"); +Relationship ListenSyslog::Success("success", "All files are routed to success"); +Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); + +void ListenSyslog::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(RecvBufSize); + properties.insert(MaxSocketBufSize); + properties.insert(MaxConnections); + properties.insert(MaxBatchSize); + properties.insert(MessageDelimiter); + properties.insert(ParseMessages); + properties.insert(Protocol); + properties.insert(Port); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + relationships.insert(Invalid); + setSupportedRelationships(relationships); +} + +void ListenSyslog::startSocketThread() +{ + if (_thread != NULL) + return; + + _logger->log_info("ListenSysLog Socket Thread Start"); + _serverTheadRunning = true; + _thread = new std::thread(run, this); + _thread->detach(); +} + +void ListenSyslog::run(ListenSyslog *process) +{ + process->runThread(); +} + +void ListenSyslog::runThread() +{ + while (_serverTheadRunning) + { + if (_resetServerSocket) + { + _resetServerSocket = false; + // need to reset the socket + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) + { + int clientSocket = *it; + close(clientSocket); + } + _clientSockets.clear(); + if (_serverSocket > 0) + { + close(_serverSocket); + _serverSocket = 0; + } + } + + if (_serverSocket <= 0) + { + uint16_t portno = _port; + struct sockaddr_in serv_addr; + int sockfd; + if (_protocol == "TCP") + sockfd = socket(AF_INET, SOCK_STREAM, 0); + else + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) + { + _logger->log_info("ListenSysLog Server socket creation failed"); + break; + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(portno); + if (bind(sockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) + { + _logger->log_error("ListenSysLog Server socket bind failed"); + break; + } + if (_protocol == "TCP") + listen(sockfd,5); + _serverSocket = sockfd; + _logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); + } + FD_ZERO(&_readfds); + FD_SET(_serverSocket, &_readfds); + _maxFds = _serverSocket; + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) + { + int clientSocket = *it; + if (clientSocket >= _maxFds) + _maxFds = clientSocket; + FD_SET(clientSocket, &_readfds); + } + fd_set fds; + struct timeval tv; + int retval; + fds = _readfds; + tv.tv_sec = 0; + // 100 msec + tv.tv_usec = 100000; + retval = select(_maxFds+1, &fds, NULL, NULL, &tv); + if (retval < 0) + break; + if (retval == 0) + continue; + if (FD_ISSET(_serverSocket, &fds)) + { + // server socket, either we have UDP datagram or TCP connection request + if (_protocol == "TCP") + { + socklen_t clilen; + struct sockaddr_in cli_addr; + clilen = sizeof(cli_addr); + int newsockfd = accept(_serverSocket, + (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd > 0) + { + if (_clientSockets.size() < _maxConnections) + { + _clientSockets.push_back(newsockfd); + _logger->log_info("ListenSysLog new client socket %d connection", newsockfd); + continue; + } + else + { + close(newsockfd); + } + } + } + else + { + socklen_t clilen; + struct sockaddr_in cli_addr; + clilen = sizeof(cli_addr); + int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, + (struct sockaddr *)&cli_addr, &clilen); + if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) + { + uint8_t *payload = new uint8_t[recvlen]; + memcpy(payload, _buffer, recvlen); + putEvent(payload, recvlen); + } + } + } + it = _clientSockets.begin(); + while (it != _clientSockets.end()) + { + int clientSocket = *it; + if (FD_ISSET(clientSocket, &fds)) + { + int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer)); + if (recvlen <= 0) + { + close(clientSocket); + _logger->log_info("ListenSysLog client socket %d close", clientSocket); + it = _clientSockets.erase(it); + } + else + { + if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) + { + uint8_t *payload = new uint8_t[recvlen]; + memcpy(payload, _buffer, recvlen); + putEvent(payload, recvlen); + } + ++it; + } + } + } + } + return; +} + + +int ListenSyslog::readline( int fd, char *bufptr, size_t len ) +{ + char *bufx = bufptr; + static char *bp; + static int cnt = 0; + static char b[ 2048 ]; + char c; + + while ( --len > 0 ) + { + if ( --cnt <= 0 ) + { + cnt = recv( fd, b, sizeof( b ), 0 ); + if ( cnt < 0 ) + { + if ( errno == EINTR ) + { + len++; /* the while will decrement */ + continue; + } + return -1; + } + if ( cnt == 0 ) + return 0; + bp = b; + } + c = *bp++; + *bufptr++ = c; + if ( c == '\n' ) + { + *bufptr = '\n'; + return bufptr - bufx + 1; + } + } + return -1; +} + +void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + bool needResetServerSocket = false; + if (context->getProperty(Protocol.getName(), value)) + { + if (_protocol != value) + needResetServerSocket = true; + _protocol = value; + } + if (context->getProperty(RecvBufSize.getName(), value)) + { + Property::StringToInt(value, _recvBufSize); + } + if (context->getProperty(MaxSocketBufSize.getName(), value)) + { + Property::StringToInt(value, _maxSocketBufSize); + } + if (context->getProperty(MaxConnections.getName(), value)) + { + Property::StringToInt(value, _maxConnections); + } + if (context->getProperty(MessageDelimiter.getName(), value)) + { + _messageDelimiter = value; + } + if (context->getProperty(ParseMessages.getName(), value)) + { + Property::StringToBool(value, _parseMessages); + } + if (context->getProperty(Port.getName(), value)) + { + int64_t oldPort = _port; + Property::StringToInt(value, _port); + if (_port != oldPort) + needResetServerSocket = true; + } + if (context->getProperty(MaxBatchSize.getName(), value)) + { + Property::StringToInt(value, _maxBatchSize); + } + + if (needResetServerSocket) + _resetServerSocket = true; + + startSocketThread(); + + // read from the event queue + if (isEventQueueEmpty()) + { + context->yield(); + return; + } + + std::queue<SysLogEvent> eventQueue; + pollEvent(eventQueue, _maxBatchSize); + bool firstEvent = true; + FlowFileRecord *flowFile = NULL; + while(!eventQueue.empty()) + { + SysLogEvent event = eventQueue.front(); + eventQueue.pop(); + if (firstEvent) + { + flowFile = session->create(); + if (!flowFile) + return; + ListenSyslog::WriteCallback callback((char *)event.payload, event.len); + session->write(flowFile, &callback); + delete[] event.payload; + firstEvent = false; + } + else + { + ListenSyslog::WriteCallback callback((char *)event.payload, event.len); + session->append(flowFile, &callback); + delete[] event.payload; + } + } + flowFile->addAttribute("syslog.protocol", _protocol); + flowFile->addAttribute("syslog.port", std::to_string(_port)); + session->transfer(flowFile, Success); +}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/LogAttribute.cpp b/libminifi/src/LogAttribute.cpp new file mode 100644 index 0000000..82130f8 --- /dev/null +++ b/libminifi/src/LogAttribute.cpp @@ -0,0 +1,158 @@ +/** + * @file LogAttribute.cpp + * LogAttribute class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <sstream> +#include <string.h> +#include <iostream> + +#include "TimeUtil.h" +#include "LogAttribute.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string LogAttribute::ProcessorName("LogAttribute"); +Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info"); +Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", ""); +Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", ""); +Property LogAttribute::LogPayload("Log Payload", + "If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", "false"); +Property LogAttribute::LogPrefix("Log prefix", + "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", ""); +Relationship LogAttribute::Success("success", "success operational on the flow record"); + +void LogAttribute::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(LogLevel); + properties.insert(AttributesToLog); + properties.insert(AttributesToIgnore); + properties.insert(LogPayload); + properties.insert(LogPrefix); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string dashLine = "--------------------------------------------------"; + LogAttrLevel level = LogAttrLevelInfo; + bool logPayload = false; + std::ostringstream message; + + FlowFileRecord *flow = session->get(); + + if (!flow) + return; + + std::string value; + if (context->getProperty(LogLevel.getName(), value)) + { + logLevelStringToEnum(value, level); + } + if (context->getProperty(LogPrefix.getName(), value)) + { + dashLine = "-----" + value + "-----"; + } + if (context->getProperty(LogPayload.getName(), value)) + { + Property::StringToBool(value, logPayload); + } + + message << "Logging for flow file " << "\n"; + message << dashLine; + message << "\nStandard FlowFile Attributes"; + message << "\n" << "UUID:" << flow->getUUIDStr(); + message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate()); + message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate()); + message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset(); + message << "\nFlowFile Attributes Map Content"; + std::map<std::string, std::string> attrs = flow->getAttributes(); + std::map<std::string, std::string>::iterator it; + for (it = attrs.begin(); it!= attrs.end(); it++) + { + message << "\n" << "key:" << it->first << " value:" << it->second; + } + message << "\nFlowFile Resource Claim Content"; + ResourceClaim *claim = flow->getResourceClaim(); + if (claim) + { + message << "\n" << "Content Claim:" << claim->getContentFullPath(); + } + if (logPayload && flow->getSize() <= 1024*1024) + { + message << "\n" << "Payload:" << "\n"; + ReadCallback callback(flow->getSize()); + session->read(flow, &callback); + for (unsigned int i = 0, j = 0; i < callback._readSize; i++) + { + char temp[8]; + sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i])); + message << temp; + j++; + if (j == 16) + { + message << '\n'; + j = 0; + } + } + } + message << "\n" << dashLine << std::ends; + std::string output = message.str(); + + switch (level) + { + case LogAttrLevelInfo: + _logger->log_info("%s", output.c_str()); + break; + case LogAttrLevelDebug: + _logger->log_debug("%s", output.c_str()); + break; + case LogAttrLevelError: + _logger->log_error("%s", output.c_str()); + break; + case LogAttrLevelTrace: + _logger->log_trace("%s", output.c_str()); + break; + case LogAttrLevelWarn: + _logger->log_warn("%s", output.c_str()); + break; + default: + break; + } + + // Test Import + /* + FlowFileRecord *importRecord = session->create(); + session->import(claim->getContentFullPath(), importRecord); + session->transfer(importRecord, Success); */ + + + // Transfer to the relationship + session->transfer(flow, Success); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Logger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Logger.cpp b/libminifi/src/Logger.cpp new file mode 100644 index 0000000..984f609 --- /dev/null +++ b/libminifi/src/Logger.cpp @@ -0,0 +1,27 @@ +/** + * @file Logger.cpp + * Logger class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> + +#include "Logger.h" + +Logger *Logger::_logger(NULL); + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp new file mode 100644 index 0000000..70ee9d7 --- /dev/null +++ b/libminifi/src/ProcessGroup.cpp @@ -0,0 +1,314 @@ +/** + * @file ProcessGroup.cpp + * ProcessGroup class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> + +#include "ProcessGroup.h" +#include "Processor.h" + +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent) +: _name(name), + _type(type), + _parentProcessGroup(parent) +{ + if (!uuid) + // Generate the global UUID for the flow record + uuid_generate(_uuid); + else + uuid_copy(_uuid, uuid); + + _yieldPeriodMsec = 0; + _transmitting = false; + + _logger = Logger::getLogger(); + _logger->log_info("ProcessGroup %s created", _name.c_str()); +} + +ProcessGroup::~ProcessGroup() +{ + for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it) + { + Connection *connection = *it; + connection->drain(); + delete connection; + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + delete processGroup; + } + + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + delete processor; + } +} + +bool ProcessGroup::isRootProcessGroup() +{ + std::lock_guard<std::mutex> lock(_mtx); + return (_type == ROOT_PROCESS_GROUP); +} + +void ProcessGroup::addProcessor(Processor *processor) +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_processors.find(processor) == _processors.end()) + { + // We do not have the same processor in this process group yet + _processors.insert(processor); + _logger->log_info("Add processor %s into process group %s", + processor->getName().c_str(), _name.c_str()); + } +} + +void ProcessGroup::removeProcessor(Processor *processor) +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_processors.find(processor) != _processors.end()) + { + // We do have the same processor in this process group yet + _processors.erase(processor); + _logger->log_info("Remove processor %s from process group %s", + processor->getName().c_str(), _name.c_str()); + } +} + +void ProcessGroup::addProcessGroup(ProcessGroup *child) +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_childProcessGroups.find(child) == _childProcessGroups.end()) + { + // We do not have the same child process group in this process group yet + _childProcessGroups.insert(child); + _logger->log_info("Add child process group %s into process group %s", + child->getName().c_str(), _name.c_str()); + } +} + +void ProcessGroup::removeProcessGroup(ProcessGroup *child) +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_childProcessGroups.find(child) != _childProcessGroups.end()) + { + // We do have the same child process group in this process group yet + _childProcessGroups.erase(child); + _logger->log_info("Remove child process group %s from process group %s", + child->getName().c_str(), _name.c_str()); + } +} + +void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler) +{ + std::lock_guard<std::mutex> lock(_mtx); + + try + { + // Start all the processor node, input and output ports + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + if (!processor->isRunning() && processor->getScheduledState() != DISABLED) + { + if (processor->getSchedulingStrategy() == TIMER_DRIVEN) + timeScheduler->schedule(processor); + } + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + processGroup->startProcessing(timeScheduler); + } + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process group start processing"); + throw; + } +} + +void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler) +{ + std::lock_guard<std::mutex> lock(_mtx); + + try + { + // Stop all the processor node, input and output ports + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + if (processor->getSchedulingStrategy() == TIMER_DRIVEN) + timeScheduler->unschedule(processor); + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + processGroup->stopProcessing(timeScheduler); + } + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process group stop processing"); + throw; + } +} + +Processor *ProcessGroup::findProcessor(uuid_t uuid) +{ + Processor *ret = NULL; + // std::lock_guard<std::mutex> lock(_mtx); + + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + uuid_t processorUUID; + if (processor->getUUID(processorUUID) && uuid_compare(processorUUID, uuid) == 0) + return processor; + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + Processor *processor = processGroup->findProcessor(uuid); + if (processor) + return processor; + } + + return ret; +} + +Processor *ProcessGroup::findProcessor(std::string processorName) +{ + Processor *ret = NULL; + + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + _logger->log_debug("Current processor is %s", processor->getName().c_str()); + if (processor->getName() == processorName) + return processor; + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + Processor *processor = processGroup->findProcessor(processorName); + if (processor) + return processor; + } + + return ret; +} + +void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) +{ + std::lock_guard<std::mutex> lock(_mtx); + + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + if (processor->getName() == processorName) + { + processor->setProperty(propertyName, propertyValue); + } + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + processGroup->updatePropertyValue(processorName, propertyName, propertyValue); + } + + return; +} + +void ProcessGroup::addConnection(Connection *connection) +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_connections.find(connection) == _connections.end()) + { + // We do not have the same connection in this process group yet + _connections.insert(connection); + _logger->log_info("Add connection %s into process group %s", + connection->getName().c_str(), _name.c_str()); + uuid_t sourceUUID; + Processor *source = NULL; + connection->getSourceProcessorUUID(sourceUUID); + source = this->findProcessor(sourceUUID); + if (source) + source->addConnection(connection); + Processor *destination = NULL; + uuid_t destinationUUID; + connection->getDestinationProcessorUUID(destinationUUID); + destination = this->findProcessor(destinationUUID); + if (destination && destination != source) + destination->addConnection(connection); + } +} + +void ProcessGroup::removeConnection(Connection *connection) +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_connections.find(connection) != _connections.end()) + { + // We do not have the same connection in this process group yet + _connections.erase(connection); + _logger->log_info("Remove connection %s into process group %s", + connection->getName().c_str(), _name.c_str()); + uuid_t sourceUUID; + Processor *source = NULL; + connection->getSourceProcessorUUID(sourceUUID); + source = this->findProcessor(sourceUUID); + if (source) + source->removeConnection(connection); + Processor *destination = NULL; + uuid_t destinationUUID; + connection->getDestinationProcessorUUID(destinationUUID); + destination = this->findProcessor(destinationUUID); + if (destination && destination != source) + destination->removeConnection(connection); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp new file mode 100644 index 0000000..4f526c3 --- /dev/null +++ b/libminifi/src/ProcessSession.cpp @@ -0,0 +1,731 @@ +/** + * @file ProcessSession.cpp + * ProcessSession class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <iostream> + +#include "ProcessSession.h" + +FlowFileRecord* ProcessSession::create() +{ + std::map<std::string, std::string> empty; + FlowFileRecord *record = new FlowFileRecord(empty); + + if (record) + { + _addedFlowFiles[record->getUUIDStr()] = record; + _logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); + } + + return record; +} + +FlowFileRecord* ProcessSession::create(FlowFileRecord *parent) +{ + FlowFileRecord *record = this->create(); + if (record) + { + // Copy attributes + std::map<std::string, std::string> parentAttributes = parent->getAttributes(); + std::map<std::string, std::string>::iterator it; + for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++) + { + if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || + it->first == FlowAttributeKey(DISCARD_REASON) || + it->first == FlowAttributeKey(UUID)) + // Do not copy special attributes from parent + continue; + record->setAttribute(it->first, it->second); + } + record->_lineageStartDate = parent->_lineageStartDate; + record->_lineageIdentifiers = parent->_lineageIdentifiers; + record->_lineageIdentifiers.insert(parent->_uuidStr); + + } + return record; +} + +FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent) +{ + FlowFileRecord *record = this->create(parent); + if (record) + { + // Copy Resource Claim + record->_claim = parent->_claim; + if (record->_claim) + { + record->_offset = parent->_offset; + record->_size = parent->_size; + record->_claim->increaseFlowFileRecordOwnedCount(); + } + } + return record; +} + +FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent) +{ + std::map<std::string, std::string> empty; + FlowFileRecord *record = new FlowFileRecord(empty); + + if (record) + { + this->_clonedFlowFiles[record->getUUIDStr()] = record; + _logger->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str()); + // Copy attributes + std::map<std::string, std::string> parentAttributes = parent->getAttributes(); + std::map<std::string, std::string>::iterator it; + for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++) + { + if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || + it->first == FlowAttributeKey(DISCARD_REASON) || + it->first == FlowAttributeKey(UUID)) + // Do not copy special attributes from parent + continue; + record->setAttribute(it->first, it->second); + } + record->_lineageStartDate = parent->_lineageStartDate; + record->_lineageIdentifiers = parent->_lineageIdentifiers; + record->_lineageIdentifiers.insert(parent->_uuidStr); + + // Copy Resource Claim + record->_claim = parent->_claim; + if (record->_claim) + { + record->_offset = parent->_offset; + record->_size = parent->_size; + record->_claim->increaseFlowFileRecordOwnedCount(); + } + } + + return record; +} + +FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size) +{ + FlowFileRecord *record = this->create(parent); + if (record) + { + if (parent->_claim) + { + if ((offset + size) > (long) parent->_size) + { + // Set offset and size + _logger->log_error("clone offset %d and size %d exceed parent size %d", + offset, size, parent->_size); + // Remove the Add FlowFile for the session + std::map<std::string, FlowFileRecord *>::iterator it = + this->_addedFlowFiles.find(record->getUUIDStr()); + if (it != this->_addedFlowFiles.end()) + this->_addedFlowFiles.erase(record->getUUIDStr()); + delete record; + return NULL; + } + record->_offset = parent->_offset + parent->_offset; + record->_size = size; + // Copy Resource Claim + record->_claim = parent->_claim; + record->_claim->increaseFlowFileRecordOwnedCount(); + } + } + return record; +} + +void ProcessSession::remove(FlowFileRecord *flow) +{ + flow->_markedDelete = true; + _deletedFlowFiles[flow->getUUIDStr()] = flow; +} + +void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value) +{ + flow->setAttribute(key, value); +} + +void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key) +{ + flow->removeAttribute(key); +} + +void ProcessSession::penalize(FlowFileRecord *flow) +{ + flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec(); +} + +void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship) +{ + _transferRelationship[flow->getUUIDStr()] = relationship; +} + +void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback) +{ + ResourceClaim *claim = NULL; + + claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + + try + { + std::ofstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + if (fs.is_open()) + { + // Call the callback to write the content + callback->process(&fs); + if (fs.good() && fs.tellp() >= 0) + { + flow->_size = fs.tellp(); + flow->_offset = 0; + if (flow->_claim) + { + // Remove the old claim + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + flow->_claim = claim; + claim->increaseFlowFileRecordOwnedCount(); + /* + _logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + } + else + { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } + catch (std::exception &exception) + { + if (flow && flow->_claim == claim) + { + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + if (claim) + delete claim; + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (flow && flow->_claim == claim) + { + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + if (claim) + delete claim; + _logger->log_debug("Caught Exception during process session write"); + throw; + } +} + +void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback) +{ + ResourceClaim *claim = NULL; + + if (flow->_claim == NULL) + { + // No existed claim for append, we need to create new claim + return write(flow, callback); + } + + claim = flow->_claim; + + try + { + std::ofstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); + if (fs.is_open()) + { + // Call the callback to write the content + std::streampos oldPos = fs.tellp(); + callback->process(&fs); + if (fs.good() && fs.tellp() >= 0) + { + uint64_t appendSize = fs.tellp() - oldPos; + flow->_size += appendSize; + /* + _logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", + flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + } + else + { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process session append"); + throw; + } +} + +void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback) +{ + try + { + ResourceClaim *claim = NULL; + if (flow->_claim == NULL) + { + // No existed claim for read, we throw exception + throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read"); + } + + claim = flow->_claim; + std::ifstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); + if (fs.is_open()) + { + fs.seekg(flow->_offset, fs.beg); + + if (fs.good()) + { + callback->process(&fs); + /* + _logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", + flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + } + else + { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process session read"); + throw; + } +} + +void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset) +{ + ResourceClaim *claim = NULL; + + claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + char *buf = NULL; + int size = 4096; + buf = new char [size]; + + try + { + std::ofstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + std::ifstream input; + input.open(source.c_str(), std::fstream::in | std::fstream::binary); + + if (fs.is_open() && input.is_open()) + { + // Open the source file and stream to the flow file + input.seekg(offset, fs.beg); + while (input.good()) + { + input.read(buf, size); + if (input) + fs.write(buf, size); + else + fs.write(buf, input.gcount()); + } + + if (fs.good() && fs.tellp() >= 0) + { + flow->_size = fs.tellp(); + flow->_offset = 0; + if (flow->_claim) + { + // Remove the old claim + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + flow->_claim = claim; + claim->increaseFlowFileRecordOwnedCount(); + /* + _logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + input.close(); + if (!keepSource) + std::remove(source.c_str()); + } + else + { + fs.close(); + input.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + + delete[] buf; + } + catch (std::exception &exception) + { + if (flow && flow->_claim == claim) + { + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + if (claim) + delete claim; + _logger->log_debug("Caught Exception %s", exception.what()); + delete[] buf; + throw; + } + catch (...) + { + if (flow && flow->_claim == claim) + { + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + if (claim) + delete claim; + _logger->log_debug("Caught Exception during process session write"); + delete[] buf; + throw; + } +} + +void ProcessSession::commit() +{ + try + { + // First we clone the flow record based on the transfered relationship for updated flow record + std::map<std::string, FlowFileRecord *>::iterator it; + for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + continue; + std::map<std::string, Relationship>::iterator itRelationship = + this->_transferRelationship.find(record->getUUIDStr()); + if (itRelationship != _transferRelationship.end()) + { + Relationship relationship = itRelationship->second; + // Find the relationship, we need to find the connections for that relationship + std::set<Connection *> connections = + _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); + if (connections.empty()) + { + // No connection + if (!_processContext->getProcessor()->isAutoTerminated(relationship)) + { + // Not autoterminate, we should have the connect + std::string message = "Connect empty for non auto terminated relationship" + relationship.getName(); + throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); + } + else + { + // Autoterminated + remove(record); + } + } + else + { + // We connections, clone the flow and assign the connection accordingly + for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) + { + Connection *connection(*itConnection); + if (itConnection == connections.begin()) + { + // First connection which the flow need be routed to + record->_connection = connection; + } + else + { + // Clone the flow file and route to the connection + FlowFileRecord *cloneRecord; + cloneRecord = this->cloneDuringTransfer(record); + if (cloneRecord) + cloneRecord->_connection = connection; + else + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); + } + } + } + } + else + { + // Can not find relationship for the flow + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); + } + } + + // Do the samething for added flow file + for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + continue; + std::map<std::string, Relationship>::iterator itRelationship = + this->_transferRelationship.find(record->getUUIDStr()); + if (itRelationship != _transferRelationship.end()) + { + Relationship relationship = itRelationship->second; + // Find the relationship, we need to find the connections for that relationship + std::set<Connection *> connections = + _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); + if (connections.empty()) + { + // No connection + if (!_processContext->getProcessor()->isAutoTerminated(relationship)) + { + // Not autoterminate, we should have the connect + std::string message = "Connect empty for non auto terminated relationship " + relationship.getName(); + throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); + } + else + { + // Autoterminated + remove(record); + } + } + else + { + // We connections, clone the flow and assign the connection accordingly + for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) + { + Connection *connection(*itConnection); + if (itConnection == connections.begin()) + { + // First connection which the flow need be routed to + record->_connection = connection; + } + else + { + // Clone the flow file and route to the connection + FlowFileRecord *cloneRecord; + cloneRecord = this->cloneDuringTransfer(record); + if (cloneRecord) + cloneRecord->_connection = connection; + else + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); + } + } + } + } + else + { + // Can not find relationship for the flow + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); + } + } + + // Complete process the added and update flow files for the session, send the flow file to its queue + for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + { + continue; + } + if (record->_connection) + record->_connection->put(record); + else + delete record; + } + for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + { + continue; + } + if (record->_connection) + record->_connection->put(record); + else + delete record; + } + // Process the clone flow files + for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + { + continue; + } + if (record->_connection) + record->_connection->put(record); + else + delete record; + } + // Delete the deleted flow files + for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + // Delete the snapshot + for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + // All done + _updatedFlowFiles.clear(); + _addedFlowFiles.clear(); + _clonedFlowFiles.clear(); + _deletedFlowFiles.clear(); + _originalFlowFiles.clear(); + _logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str()); + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process session commit"); + throw; + } +} + + +void ProcessSession::rollback() +{ + try + { + std::map<std::string, FlowFileRecord *>::iterator it; + // Requeue the snapshot of the flowfile back + for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_orginalConnection) + { + record->_snapshot = false; + record->_orginalConnection->put(record); + } + else + delete record; + } + _originalFlowFiles.clear(); + // Process the clone flow files + for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + _clonedFlowFiles.clear(); + for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + _addedFlowFiles.clear(); + for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + _updatedFlowFiles.clear(); + _deletedFlowFiles.clear(); + _logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str()); + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process session roll back"); + throw; + } +} + +FlowFileRecord *ProcessSession::get() +{ + Connection *first = _processContext->getProcessor()->getNextIncomingConnection(); + + if (first == NULL) + return NULL; + + Connection *current = first; + + do + { + std::set<FlowFileRecord *> expired; + FlowFileRecord *ret = current->poll(expired); + if (expired.size() > 0) + { + // Remove expired flow record + for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it) + { + delete (*it); + } + } + if (ret) + { + // add the flow record to the current process session update map + ret->_markedDelete = false; + _updatedFlowFiles[ret->getUUIDStr()] = ret; + std::map<std::string, std::string> empty; + FlowFileRecord *snapshot = new FlowFileRecord(empty); + _logger->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str()); + snapshot->duplicate(ret); + // save a snapshot + _originalFlowFiles[snapshot->getUUIDStr()] = snapshot; + return ret; + } + current = _processContext->getProcessor()->getNextIncomingConnection(); + } + while (current != NULL && current != first); + + return NULL; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp new file mode 100644 index 0000000..cc136dc --- /dev/null +++ b/libminifi/src/Processor.cpp @@ -0,0 +1,451 @@ +/** + * @file Processor.cpp + * Processor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> + +#include "Processor.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +Processor::Processor(std::string name, uuid_t uuid) +: _name(name) +{ + if (!uuid) + // Generate the global UUID for the flow record + uuid_generate(_uuid); + else + uuid_copy(_uuid, uuid); + + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + _uuidStr = uuidStr; + + // Setup the default values + _state = DISABLED; + _strategy = TIMER_DRIVEN; + _lossTolerant = false; + _triggerWhenEmpty = false; + _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS; + _runDurantionNano = 0; + _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000; + _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000; + _maxConcurrentTasks = 1; + _activeTasks = 0; + _yieldExpiration = 0; + _incomingConnectionsIter = this->_incomingConnections.begin(); + _logger = Logger::getLogger(); + + _logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str()); +} + +Processor::~Processor() +{ + +} + +bool Processor::isRunning() +{ + return (_state == RUNNING && _activeTasks > 0); +} + +bool Processor::setSupportedProperties(std::set<Property> properties) +{ + if (isRunning()) + { + _logger->log_info("Can not set processor property while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + _properties.clear(); + for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it) + { + Property item(*it); + _properties[item.getName()] = item; + _logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str()); + } + + return true; +} + +bool Processor::setSupportedRelationships(std::set<Relationship> relationships) +{ + if (isRunning()) + { + _logger->log_info("Can not set processor supported relationship while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + _relationships.clear(); + for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) + { + Relationship item(*it); + _relationships[item.getName()] = item; + _logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str()); + } + + return true; +} + +bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships) +{ + if (isRunning()) + { + _logger->log_info("Can not set processor auto terminated relationship while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + _autoTerminatedRelationships.clear(); + for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) + { + Relationship item(*it); + _autoTerminatedRelationships[item.getName()] = item; + _logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str()); + } + + return true; +} + +bool Processor::isAutoTerminated(Relationship relationship) +{ + bool isRun = isRunning(); + + if (!isRun) + _mtx.lock(); + + std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName()); + if (it != _autoTerminatedRelationships.end()) + { + if (!isRun) + _mtx.unlock(); + return true; + } + else + { + if (!isRun) + _mtx.unlock(); + return false; + } +} + +bool Processor::isSupportedRelationship(Relationship relationship) +{ + bool isRun = isRunning(); + + if (!isRun) + _mtx.lock(); + + std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName()); + if (it != _relationships.end()) + { + if (!isRun) + _mtx.unlock(); + return true; + } + else + { + if (!isRun) + _mtx.unlock(); + return false; + } +} + +bool Processor::getProperty(std::string name, std::string &value) +{ + bool isRun = isRunning(); + + if (!isRun) + // Because set property only allowed in non running state, we need to obtain lock avoid rack condition + _mtx.lock(); + + std::map<std::string, Property>::iterator it = _properties.find(name); + if (it != _properties.end()) + { + Property item = it->second; + value = item.getValue(); + if (!isRun) + _mtx.unlock(); + return true; + } + else + { + if (!isRun) + _mtx.unlock(); + return false; + } +} + +bool Processor::setProperty(std::string name, std::string value) +{ + + std::lock_guard<std::mutex> lock(_mtx); + std::map<std::string, Property>::iterator it = _properties.find(name); + + if (it != _properties.end()) + { + Property item = it->second; + item.setValue(value); + _properties[item.getName()] = item; + _logger->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str()); + return true; + } + else + { + return false; + } +} + +std::set<Connection *> Processor::getOutGoingConnections(std::string relationship) +{ + std::set<Connection *> empty; + + std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship); + if (it != _outGoingConnections.end()) + { + return _outGoingConnections[relationship]; + } + else + { + return empty; + } +} + +bool Processor::addConnection(Connection *connection) +{ + bool ret = false; + + if (isRunning()) + { + _logger->log_info("Can not add connection while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + uuid_t srcUUID; + uuid_t destUUID; + + connection->getSourceProcessorUUID(srcUUID); + connection->getDestinationProcessorUUID(destUUID); + + if (uuid_compare(_uuid, destUUID) == 0) + { + // Connection is destination to the current processor + if (_incomingConnections.find(connection) == _incomingConnections.end()) + { + _incomingConnections.insert(connection); + connection->setDestinationProcessor(this); + _logger->log_info("Add connection %s into Processor %s incoming connection", + connection->getName().c_str(), _name.c_str()); + _incomingConnectionsIter = this->_incomingConnections.begin(); + ret = true; + } + } + + if (uuid_compare(_uuid, srcUUID) == 0) + { + std::string relationship = connection->getRelationship().getName(); + // Connection is source from the current processor + std::map<std::string, std::set<Connection *>>::iterator it = + _outGoingConnections.find(relationship); + if (it != _outGoingConnections.end()) + { + // We already has connection for this relationship + std::set<Connection *> existedConnection = it->second; + if (existedConnection.find(connection) == existedConnection.end()) + { + // We do not have the same connection for this relationship yet + existedConnection.insert(connection); + connection->setSourceProcessor(this); + _outGoingConnections[relationship] = existedConnection; + _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), _name.c_str(), relationship.c_str()); + ret = true; + } + } + else + { + // We do not have any outgoing connection for this relationship yet + std::set<Connection *> newConnection; + newConnection.insert(connection); + connection->setSourceProcessor(this); + _outGoingConnections[relationship] = newConnection; + _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), _name.c_str(), relationship.c_str()); + ret = true; + } + } + + return ret; +} + +void Processor::removeConnection(Connection *connection) +{ + if (isRunning()) + { + _logger->log_info("Can not remove connection while the process %s is running", + _name.c_str()); + return; + } + + std::lock_guard<std::mutex> lock(_mtx); + + uuid_t srcUUID; + uuid_t destUUID; + + connection->getSourceProcessorUUID(srcUUID); + connection->getDestinationProcessorUUID(destUUID); + + if (uuid_compare(_uuid, destUUID) == 0) + { + // Connection is destination to the current processor + if (_incomingConnections.find(connection) != _incomingConnections.end()) + { + _incomingConnections.erase(connection); + connection->setDestinationProcessor(NULL); + _logger->log_info("Remove connection %s into Processor %s incoming connection", + connection->getName().c_str(), _name.c_str()); + _incomingConnectionsIter = this->_incomingConnections.begin(); + } + } + + if (uuid_compare(_uuid, srcUUID) == 0) + { + std::string relationship = connection->getRelationship().getName(); + // Connection is source from the current processor + std::map<std::string, std::set<Connection *>>::iterator it = + _outGoingConnections.find(relationship); + if (it == _outGoingConnections.end()) + { + return; + } + else + { + if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end()) + { + _outGoingConnections[relationship].erase(connection); + connection->setSourceProcessor(NULL); + _logger->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), _name.c_str(), relationship.c_str()); + } + } + } +} + +Connection *Processor::getNextIncomingConnection() +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_incomingConnections.size() == 0) + return NULL; + + if (_incomingConnectionsIter == _incomingConnections.end()) + _incomingConnectionsIter = _incomingConnections.begin(); + + Connection *ret = *_incomingConnectionsIter; + _incomingConnectionsIter++; + + if (_incomingConnectionsIter == _incomingConnections.end()) + _incomingConnectionsIter = _incomingConnections.begin(); + + return ret; +} + +bool Processor::flowFilesQueued() +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_incomingConnections.size() == 0) + return false; + + for (std::set<Connection *>::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it) + { + Connection *connection = *it; + if (connection->getQueueSize() > 0) + return true; + } + + return false; +} + +bool Processor::flowFilesOutGoingFull() +{ + std::lock_guard<std::mutex> lock(_mtx); + + std::map<std::string, std::set<Connection *>>::iterator it; + + for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it) + { + // We already has connection for this relationship + std::set<Connection *> existedConnection = it->second; + for (std::set<Connection *>::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection) + { + Connection *connection = *itConnection; + if (connection->isFull()) + return true; + } + } + + return false; +} + +void Processor::onTrigger() +{ + ProcessContext *context = new ProcessContext(this); + ProcessSession *session = new ProcessSession(context); + try { + // Call the child onTrigger function + this->onTrigger(context, session); + session->commit(); + delete session; + delete context; + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + session->rollback(); + delete session; + delete context; + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception Processor::onTrigger"); + session->rollback(); + delete session; + delete context; + throw; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RealTimeDataCollector.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RealTimeDataCollector.cpp b/libminifi/src/RealTimeDataCollector.cpp new file mode 100644 index 0000000..c7118ff --- /dev/null +++ b/libminifi/src/RealTimeDataCollector.cpp @@ -0,0 +1,482 @@ +/** + * @file RealTimeDataCollector.cpp + * RealTimeDataCollector class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include <netinet/tcp.h> + +#include "RealTimeDataCollector.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector"); +Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real time processor to process", "data.osp"); +Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", "Real Time Server Name", "localhost"); +Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", "Real Time Server Port", "10000"); +Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch Server Name", "localhost"); +Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch Server Port", "10001"); +Property RealTimeDataCollector::ITERATION("Iteration", + "If true, sample osp file will be iterated", "true"); +Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real Time Message ID", "41"); +Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message ID", "172, 30, 48"); +Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real Time Data Collection Interval in msec", "10 ms"); +Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch Processing Interval in msec", "100 ms"); +Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144"); +Relationship RealTimeDataCollector::Success("success", "success operational on the flow record"); + +void RealTimeDataCollector::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(FILENAME); + properties.insert(REALTIMESERVERNAME); + properties.insert(REALTIMESERVERPORT); + properties.insert(BATCHSERVERNAME); + properties.insert(BATCHSERVERPORT); + properties.insert(ITERATION); + properties.insert(REALTIMEMSGID); + properties.insert(BATCHMSGID); + properties.insert(REALTIMEINTERVAL); + properties.insert(BATCHINTERVAL); + properties.insert(BATCHMAXBUFFERSIZE); + + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); + +} + +int RealTimeDataCollector::connectServer(const char *host, uint16_t port) +{ + in_addr_t addr; + int sock = 0; + struct hostent *h; +#ifdef __MACH__ + h = gethostbyname(host); +#else + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + { + _logger->log_error("Could not create socket to hostName %s", host); + return 0; + } + +#ifndef __MACH__ + int opt = 1; + bool nagle_off = true; + + if (nagle_off) + { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return 0; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return 0; + } + } + + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) + { + _logger->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return 0; + } +#endif + + struct sockaddr_in sa; + socklen_t socklen; + int status; + + //TODO bind socket to the interface + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_ANY); + sa.sin_port = htons(0); + socklen = sizeof(sa); + if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) + { + _logger->log_error("socket bind failed"); + close(sock); + return 0; + } + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = addr; + sa.sin_port = htons(port); + socklen = sizeof(sa); + + status = connect(sock, (struct sockaddr *)&sa, socklen); + + if (status < 0) + { + _logger->log_error("socket connect failed to %s %d", host, port); + close(sock); + return 0; + } + + _logger->log_info("socket %d connect to server %s port %d success", sock, host, port); + + return sock; +} + +int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) +{ + int ret = 0, bytes = 0; + + while (bytes < buflen) + { + ret = send(socket, buf+bytes, buflen-bytes, 0); + //check for errors + if (ret == -1) + { + return ret; + } + bytes+=ret; + } + + if (ret) + _logger->log_debug("Send data size %d over socket %d", buflen, socket); + + return ret; +} + +void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, ProcessSession *session) +{ + if (_realTimeAccumulated >= this->_realTimeInterval) + { + std::string value; + if (this->getProperty(REALTIMEMSGID.getName(), value)) + { + this->_realTimeMsgID.clear(); + this->_logger->log_info("Real Time Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + this->_realTimeMsgID.push_back(cell); + // this->_logger->log_debug("Real Time Msg ID %s", cell.c_str()); + } + } + if (this->getProperty(BATCHMSGID.getName(), value)) + { + this->_batchMsgID.clear(); + this->_logger->log_info("Batch Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + cell = Property::trim(cell); + this->_batchMsgID.push_back(cell); + // this->_logger->log_debug("Batch Msg ID %s", cell.c_str()); + } + } + // _logger->log_info("onTriggerRealTime"); + // Open the file + if (!this->_fileStream.is_open()) + { + _fileStream.open(this->_fileName.c_str(), std::ifstream::in); + if (this->_fileStream.is_open()) + _logger->log_debug("open %s", _fileName.c_str()); + } + if (!_fileStream.good()) + { + _logger->log_error("load data file failed %s", _fileName.c_str()); + return; + } + if (this->_fileStream.is_open()) + { + std::string line; + + while (std::getline(_fileStream, line)) + { + line += "\n"; + std::stringstream lineStream(line); + std::string cell; + if (std::getline(lineStream, cell, ',')) + { + cell = Property::trim(cell); + // Check whether it match to the batch traffic + for (std::vector<std::string>::iterator it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it) + { + if (cell == *it) + { + // push the batch data to the queue + std::lock_guard<std::mutex> lock(_mtx); + while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) + { + std::string item = _queue.front(); + _queuedDataSize -= item.size(); + _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); + _queue.pop(); + } + _queue.push(line); + _queuedDataSize += line.size(); + _logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); + } + } + bool findRealTime = false; + // Check whether it match to the real time traffic + for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it) + { + if (cell == *it) + { + int status = 0; + if (this->_realTimeSocket <= 0) + { + // Connect the LTE socket + uint16_t port = _realTimeServerPort; + this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); + } + if (this->_realTimeSocket) + { + // try to send the data + status = sendData(_realTimeSocket, line.data(), line.size()); + if (status < 0) + { + close(_realTimeSocket); + _realTimeSocket = 0; + } + } + if (this->_realTimeSocket <= 0 || status < 0) + { + // push the batch data to the queue + std::lock_guard<std::mutex> lock(_mtx); + while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) + { + std::string item = _queue.front(); + _queuedDataSize -= item.size(); + _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); + _queue.pop(); + } + _queue.push(line); + _queuedDataSize += line.size(); + _logger->log_debug("Push real time msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); + } + // find real time + findRealTime = true; + } // cell + } // for real time pattern + if (findRealTime) + // we break the while once we find the first real time + break; + } // if get line + } // while + if (_fileStream.eof()) + { + _fileStream.close(); + } + } // if open + _realTimeAccumulated = 0; + } + _realTimeAccumulated += context->getProcessor()->getSchedulingPeriodNano(); +} + +void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, ProcessSession *session) +{ + if (_batchAcccumulated >= this->_batchInterval) + { + // _logger->log_info("onTriggerBatch"); + // dequeue the batch and send over WIFI + int status = 0; + if (this->_batchSocket <= 0) + { + // Connect the WIFI socket + uint16_t port = _batchServerPort; + this->_batchSocket = connectServer(_batchServerName.c_str(), port); + } + if (this->_batchSocket) + { + std::lock_guard<std::mutex> lock(_mtx); + + while (!_queue.empty()) + { + std::string line = _queue.front(); + status = sendData(_batchSocket, line.data(), line.size()); + _queue.pop(); + _queuedDataSize -= line.size(); + if (status < 0) + { + close(_batchSocket); + _batchSocket = 0; + break; + } + } + } + _batchAcccumulated = 0; + } + _batchAcccumulated += context->getProcessor()->getSchedulingPeriodNano(); +} + +void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::thread::id id = std::this_thread::get_id(); + + if (id == _realTimeThreadId) + return onTriggerRealTime(context, session); + else if (id == _batchThreadId) + return onTriggerBatch(context, session); + else + { + std::lock_guard<std::mutex> lock(_mtx); + if (!this->_firstInvoking) + { + this->_fileName = "data.osp"; + std::string value; + if (this->getProperty(FILENAME.getName(), value)) + { + this->_fileName = value; + this->_logger->log_info("Data Collector File Name %s", _fileName.c_str()); + } + this->_realTimeServerName = "localhost"; + if (this->getProperty(REALTIMESERVERNAME.getName(), value)) + { + this->_realTimeServerName = value; + this->_logger->log_info("Real Time Server Name %s", this->_realTimeServerName.c_str()); + } + this->_realTimeServerPort = 10000; + if (this->getProperty(REALTIMESERVERPORT.getName(), value)) + { + Property::StringToInt(value, _realTimeServerPort); + this->_logger->log_info("Real Time Server Port %d", _realTimeServerPort); + } + if (this->getProperty(BATCHSERVERNAME.getName(), value)) + { + this->_batchServerName = value; + this->_logger->log_info("Batch Server Name %s", this->_batchServerName.c_str()); + } + this->_batchServerPort = 10001; + if (this->getProperty(BATCHSERVERPORT.getName(), value)) + { + Property::StringToInt(value, _batchServerPort); + this->_logger->log_info("Batch Server Port %d", _batchServerPort); + } + if (this->getProperty(ITERATION.getName(), value)) + { + Property::StringToBool(value, this->_iteration); + _logger->log_info("Iteration %d", _iteration); + } + this->_realTimeInterval = 10000000; //10 msec + if (this->getProperty(REALTIMEINTERVAL.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _realTimeInterval, unit) && + Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval)) + { + _logger->log_info("Real Time Interval: [%d] ns", _realTimeInterval); + } + } + this->_batchInterval = 100000000; //100 msec + if (this->getProperty(BATCHINTERVAL.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _batchInterval, unit) && + Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval)) + { + _logger->log_info("Batch Time Interval: [%d] ns", _batchInterval); + } + } + this->_batchMaxBufferSize = 256*1024; + if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) + { + Property::StringToInt(value, _batchMaxBufferSize); + this->_logger->log_info("Batch Max Buffer Size %d", _batchMaxBufferSize); + } + if (this->getProperty(REALTIMEMSGID.getName(), value)) + { + this->_logger->log_info("Real Time Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + this->_realTimeMsgID.push_back(cell); + this->_logger->log_info("Real Time Msg ID %s", cell.c_str()); + } + } + if (this->getProperty(BATCHMSGID.getName(), value)) + { + this->_logger->log_info("Batch Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + cell = Property::trim(cell); + this->_batchMsgID.push_back(cell); + this->_logger->log_info("Batch Msg ID %s", cell.c_str()); + } + } + // Connect the LTE socket + uint16_t port = _realTimeServerPort; + + this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); + + // Connect the WIFI socket + port = _batchServerPort; + + this->_batchSocket = connectServer(_batchServerName.c_str(), port); + + // Open the file + _fileStream.open(this->_fileName.c_str(), std::ifstream::in); + if (!_fileStream.good()) + { + _logger->log_error("load data file failed %s", _fileName.c_str()); + return; + } + else + { + _logger->log_debug("open %s", _fileName.c_str()); + } + _realTimeThreadId = id; + this->_firstInvoking = true; + } + else + { + if (id != _realTimeThreadId) + _batchThreadId = id; + this->_firstInvoking = false; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp new file mode 100644 index 0000000..9d849ae --- /dev/null +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -0,0 +1,100 @@ +/** + * @file RemoteProcessorGroupPort.cpp + * RemoteProcessorGroupPort class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <sstream> +#include <string.h> +#include <iostream> + +#include "TimeUtil.h" +#include "RemoteProcessorGroupPort.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort"); +Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost"); +Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); +Relationship RemoteProcessorGroupPort::relation; + +void RemoteProcessorGroupPort::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(hostName); + properties.insert(port); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); +} + +void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + + if (!_transmitting) + return; + + std::string host = _peer->getHostName(); + uint16_t sport = _peer->getPort(); + int64_t lvalue; + bool needReset = false; + + if (context->getProperty(hostName.getName(), value)) + { + host = value; + } + if (context->getProperty(port.getName(), value) && Property::StringToInt(value, lvalue)) + { + sport = (uint16_t) lvalue; + } + if (host != _peer->getHostName()) + { + _peer->setHostName(host); + needReset= true; + } + if (sport != _peer->getPort()) + { + _peer->setPort(sport); + needReset = true; + } + if (needReset) + _protocol->tearDown(); + + if (!_protocol->bootstrap()) + { + // bootstrap the client protocol if needeed + context->yield(); + _logger->log_error("Site2Site bootstrap failed yield period %d peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), _protocol->getPeer()->getTimeOut()); + return; + } + + if (_direction == RECEIVE) + _protocol->receiveFlowFiles(context, session); + else + _protocol->transferFlowFiles(context, session); + + return; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp new file mode 100644 index 0000000..3c22ac9 --- /dev/null +++ b/libminifi/src/ResourceClaim.cpp @@ -0,0 +1,45 @@ +/** + * @file ResourceClaim.cpp + * ResourceClaim class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> + +#include "ResourceClaim.h" + +std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0); + +ResourceClaim::ResourceClaim(const std::string contentDirectory) +: _id(_localResourceClaimNumber.load()), + _flowFileRecordOwnedCount(0) +{ + char uuidStr[37]; + + // Generate the global UUID for the resource claim + uuid_generate(_uuid); + // Increase the local ID for the resource claim + ++_localResourceClaimNumber; + uuid_unparse(_uuid, uuidStr); + // Create the full content path for the content + _contentFullPath = contentDirectory + "/" + uuidStr; + + _configure = Configure::getConfigure(); + _logger = Logger::getLogger(); + _logger->log_debug("Resource Claim created %s", _contentFullPath.c_str()); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp new file mode 100644 index 0000000..211c328 --- /dev/null +++ b/libminifi/src/SchedulingAgent.cpp @@ -0,0 +1,86 @@ +/** + * @file SchedulingAgent.cpp + * SchedulingAgent class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <chrono> +#include <thread> +#include <iostream> +#include "Exception.h" +#include "SchedulingAgent.h" + +bool SchedulingAgent::hasWorkToDo(Processor *processor) +{ + // Whether it has work to do + if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || + processor->flowFilesQueued()) + return true; + else + return false; +} + +bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor) +{ + return processor->flowFilesOutGoingFull(); +} + +bool SchedulingAgent::onTrigger(Processor *processor) +{ + if (processor->isYield()) + return false; + + // No need to yield, reset yield expiration to 0 + processor->clearYield(); + + if (!hasWorkToDo(processor)) + // No work to do, yield + return true; + + if(hasTooMuchOutGoing(processor)) + // need to apply backpressure + return true; + + //TODO runDuration + + processor->incrementActiveTasks(); + try + { + processor->onTrigger(); + processor->decrementActiveTask(); + } + catch (Exception &exception) + { + // Normal exception + _logger->log_debug("Caught Exception %s", exception.what()); + processor->decrementActiveTask(); + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + processor->yield(_administrativeYieldDuration); + processor->decrementActiveTask(); + } + catch (...) + { + _logger->log_debug("Caught Exception during SchedulingAgent::onTrigger"); + processor->yield(_administrativeYieldDuration); + processor->decrementActiveTask(); + } + + return false; +} +