http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ListenSyslog.cpp b/libminifi/src/ListenSyslog.cpp deleted file mode 100644 index f2901e0..0000000 --- a/libminifi/src/ListenSyslog.cpp +++ /dev/null @@ -1,343 +0,0 @@ -/** - * @file ListenSyslog.cpp - * ListenSyslog class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <queue> -#include <stdio.h> -#include <string> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" -#include "ListenSyslog.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string ListenSyslog::ProcessorName("ListenSyslog"); -Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B"); -Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB"); -Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2"); -Property ListenSyslog::MaxBatchSize("Max Batch Size", - "The maximum number of Syslog events to add to a single FlowFile.", "1"); -Property ListenSyslog::MessageDelimiter("Message Delimiter", - "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n"); -Property ListenSyslog::ParseMessages("Parse Messages", - "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false"); -Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP"); -Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514"); -Relationship ListenSyslog::Success("success", "All files are routed to success"); -Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); - -void ListenSyslog::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(RecvBufSize); - properties.insert(MaxSocketBufSize); - properties.insert(MaxConnections); - properties.insert(MaxBatchSize); - properties.insert(MessageDelimiter); - properties.insert(ParseMessages); - properties.insert(Protocol); - properties.insert(Port); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - relationships.insert(Invalid); - setSupportedRelationships(relationships); -} - -void ListenSyslog::startSocketThread() -{ - if (_thread != NULL) - return; - - logger_->log_info("ListenSysLog Socket Thread Start"); - _serverTheadRunning = true; - _thread = new std::thread(run, this); - _thread->detach(); -} - -void ListenSyslog::run(ListenSyslog *process) -{ - process->runThread(); -} - -void ListenSyslog::runThread() -{ - while (_serverTheadRunning) - { - if (_resetServerSocket) - { - _resetServerSocket = false; - // need to reset the socket - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) - { - int clientSocket = *it; - close(clientSocket); - } - _clientSockets.clear(); - if (_serverSocket > 0) - { - close(_serverSocket); - _serverSocket = 0; - } - } - - if (_serverSocket <= 0) - { - uint16_t portno = _port; - struct sockaddr_in serv_addr; - int sockfd; - if (_protocol == "TCP") - sockfd = socket(AF_INET, SOCK_STREAM, 0); - else - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (sockfd < 0) - { - logger_->log_info("ListenSysLog Server socket creation failed"); - break; - } - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(portno); - if (bind(sockfd, (struct sockaddr *) &serv_addr, - sizeof(serv_addr)) < 0) - { - logger_->log_error("ListenSysLog Server socket bind failed"); - break; - } - if (_protocol == "TCP") - listen(sockfd,5); - _serverSocket = sockfd; - logger_->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); - } - FD_ZERO(&_readfds); - FD_SET(_serverSocket, &_readfds); - _maxFds = _serverSocket; - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) - { - int clientSocket = *it; - if (clientSocket >= _maxFds) - _maxFds = clientSocket; - FD_SET(clientSocket, &_readfds); - } - fd_set fds; - struct timeval tv; - int retval; - fds = _readfds; - tv.tv_sec = 0; - // 100 msec - tv.tv_usec = 100000; - retval = select(_maxFds+1, &fds, NULL, NULL, &tv); - if (retval < 0) - break; - if (retval == 0) - continue; - if (FD_ISSET(_serverSocket, &fds)) - { - // server socket, either we have UDP datagram or TCP connection request - if (_protocol == "TCP") - { - socklen_t clilen; - struct sockaddr_in cli_addr; - clilen = sizeof(cli_addr); - int newsockfd = accept(_serverSocket, - (struct sockaddr *) &cli_addr, - &clilen); - if (newsockfd > 0) - { - if (_clientSockets.size() < _maxConnections) - { - _clientSockets.push_back(newsockfd); - logger_->log_info("ListenSysLog new client socket %d connection", newsockfd); - continue; - } - else - { - close(newsockfd); - } - } - } - else - { - socklen_t clilen; - struct sockaddr_in cli_addr; - clilen = sizeof(cli_addr); - int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, - (struct sockaddr *)&cli_addr, &clilen); - if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) - { - uint8_t *payload = new uint8_t[recvlen]; - memcpy(payload, _buffer, recvlen); - putEvent(payload, recvlen); - } - } - } - it = _clientSockets.begin(); - while (it != _clientSockets.end()) - { - int clientSocket = *it; - if (FD_ISSET(clientSocket, &fds)) - { - int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer)); - if (recvlen <= 0) - { - close(clientSocket); - logger_->log_info("ListenSysLog client socket %d close", clientSocket); - it = _clientSockets.erase(it); - } - else - { - if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) - { - uint8_t *payload = new uint8_t[recvlen]; - memcpy(payload, _buffer, recvlen); - putEvent(payload, recvlen); - } - ++it; - } - } - } - } - return; -} - - -int ListenSyslog::readline( int fd, char *bufptr, size_t len ) -{ - char *bufx = bufptr; - static char *bp; - static int cnt = 0; - static char b[ 2048 ]; - char c; - - while ( --len > 0 ) - { - if ( --cnt <= 0 ) - { - cnt = recv( fd, b, sizeof( b ), 0 ); - if ( cnt < 0 ) - { - if ( errno == EINTR ) - { - len++; /* the while will decrement */ - continue; - } - return -1; - } - if ( cnt == 0 ) - return 0; - bp = b; - } - c = *bp++; - *bufptr++ = c; - if ( c == '\n' ) - { - *bufptr = '\n'; - return bufptr - bufx + 1; - } - } - return -1; -} - -void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - bool needResetServerSocket = false; - if (context->getProperty(Protocol.getName(), value)) - { - if (_protocol != value) - needResetServerSocket = true; - _protocol = value; - } - if (context->getProperty(RecvBufSize.getName(), value)) - { - Property::StringToInt(value, _recvBufSize); - } - if (context->getProperty(MaxSocketBufSize.getName(), value)) - { - Property::StringToInt(value, _maxSocketBufSize); - } - if (context->getProperty(MaxConnections.getName(), value)) - { - Property::StringToInt(value, _maxConnections); - } - if (context->getProperty(MessageDelimiter.getName(), value)) - { - _messageDelimiter = value; - } - if (context->getProperty(ParseMessages.getName(), value)) - { - StringUtils::StringToBool(value, _parseMessages); - } - if (context->getProperty(Port.getName(), value)) - { - int64_t oldPort = _port; - Property::StringToInt(value, _port); - if (_port != oldPort) - needResetServerSocket = true; - } - if (context->getProperty(MaxBatchSize.getName(), value)) - { - Property::StringToInt(value, _maxBatchSize); - } - - if (needResetServerSocket) - _resetServerSocket = true; - - startSocketThread(); - - // read from the event queue - if (isEventQueueEmpty()) - { - context->yield(); - return; - } - - std::queue<SysLogEvent> eventQueue; - pollEvent(eventQueue, _maxBatchSize); - bool firstEvent = true; - FlowFileRecord *flowFile = NULL; - while(!eventQueue.empty()) - { - SysLogEvent event = eventQueue.front(); - eventQueue.pop(); - if (firstEvent) - { - flowFile = session->create(); - if (!flowFile) - return; - ListenSyslog::WriteCallback callback((char *)event.payload, event.len); - session->write(flowFile, &callback); - delete[] event.payload; - firstEvent = false; - } - else - { - ListenSyslog::WriteCallback callback((char *)event.payload, event.len); - session->append(flowFile, &callback); - delete[] event.payload; - } - } - flowFile->addAttribute("syslog.protocol", _protocol); - flowFile->addAttribute("syslog.port", std::to_string(_port)); - session->transfer(flowFile, Success); -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/LogAppenders.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/LogAppenders.cpp b/libminifi/src/LogAppenders.cpp deleted file mode 100644 index c90588d..0000000 --- a/libminifi/src/LogAppenders.cpp +++ /dev/null @@ -1,25 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "../include/LogAppenders.h" - -const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr"; - -const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file"; -const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files"; -const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/LogAttribute.cpp b/libminifi/src/LogAttribute.cpp deleted file mode 100644 index 345eb69..0000000 --- a/libminifi/src/LogAttribute.cpp +++ /dev/null @@ -1,159 +0,0 @@ -/** - * @file LogAttribute.cpp - * LogAttribute class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <sstream> -#include <string.h> -#include <iostream> - -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" -#include "LogAttribute.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string LogAttribute::ProcessorName("LogAttribute"); -Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info"); -Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", ""); -Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", ""); -Property LogAttribute::LogPayload("Log Payload", - "If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", "false"); -Property LogAttribute::LogPrefix("Log prefix", - "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", ""); -Relationship LogAttribute::Success("success", "success operational on the flow record"); - -void LogAttribute::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(LogLevel); - properties.insert(AttributesToLog); - properties.insert(AttributesToIgnore); - properties.insert(LogPayload); - properties.insert(LogPrefix); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string dashLine = "--------------------------------------------------"; - LogAttrLevel level = LogAttrLevelInfo; - bool logPayload = false; - std::ostringstream message; - - FlowFileRecord *flow = session->get(); - - if (!flow) - return; - - std::string value; - if (context->getProperty(LogLevel.getName(), value)) - { - logLevelStringToEnum(value, level); - } - if (context->getProperty(LogPrefix.getName(), value)) - { - dashLine = "-----" + value + "-----"; - } - if (context->getProperty(LogPayload.getName(), value)) - { - StringUtils::StringToBool(value, logPayload); - } - - message << "Logging for flow file " << "\n"; - message << dashLine; - message << "\nStandard FlowFile Attributes"; - message << "\n" << "UUID:" << flow->getUUIDStr(); - message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate()); - message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate()); - message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset(); - message << "\nFlowFile Attributes Map Content"; - std::map<std::string, std::string> attrs = flow->getAttributes(); - std::map<std::string, std::string>::iterator it; - for (it = attrs.begin(); it!= attrs.end(); it++) - { - message << "\n" << "key:" << it->first << " value:" << it->second; - } - message << "\nFlowFile Resource Claim Content"; - ResourceClaim *claim = flow->getResourceClaim(); - if (claim) - { - message << "\n" << "Content Claim:" << claim->getContentFullPath(); - } - if (logPayload && flow->getSize() <= 1024*1024) - { - message << "\n" << "Payload:" << "\n"; - ReadCallback callback(flow->getSize()); - session->read(flow, &callback); - for (unsigned int i = 0, j = 0; i < callback._readSize; i++) - { - char temp[8]; - sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i])); - message << temp; - j++; - if (j == 16) - { - message << '\n'; - j = 0; - } - } - } - message << "\n" << dashLine << std::ends; - std::string output = message.str(); - - switch (level) - { - case LogAttrLevelInfo: - logger_->log_info("%s", output.c_str()); - break; - case LogAttrLevelDebug: - logger_->log_debug("%s", output.c_str()); - break; - case LogAttrLevelError: - logger_->log_error("%s", output.c_str()); - break; - case LogAttrLevelTrace: - logger_->log_trace("%s", output.c_str()); - break; - case LogAttrLevelWarn: - logger_->log_warn("%s", output.c_str()); - break; - default: - break; - } - - // Test Import - /* - FlowFileRecord *importRecord = session->create(); - session->import(claim->getContentFullPath(), importRecord); - session->transfer(importRecord, Success); */ - - - // Transfer to the relationship - session->transfer(flow, Success); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Logger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Logger.cpp b/libminifi/src/Logger.cpp deleted file mode 100644 index e90667d..0000000 --- a/libminifi/src/Logger.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/** - * @file Logger.cpp - * Logger class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "../include/Logger.h" - -#include <vector> -#include <queue> -#include <map> - - -std::shared_ptr<Logger> Logger::singleton_logger_(nullptr); - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp deleted file mode 100644 index 7e3527e..0000000 --- a/libminifi/src/ProcessGroup.cpp +++ /dev/null @@ -1,307 +0,0 @@ -/** - * @file ProcessGroup.cpp - * ProcessGroup class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> - -#include "ProcessGroup.h" -#include "Processor.h" - -ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, - ProcessGroup *parent) : - name_(name), type_(type), parent_process_group_(parent) { - if (!uuid) - // Generate the global UUID for the flow record - uuid_generate(uuid_); - else - uuid_copy(uuid_, uuid); - - yield_period_msec_ = 0; - transmitting_ = false; - - logger_ = Logger::getLogger(); - logger_->log_info("ProcessGroup %s created", name_.c_str()); -} - -ProcessGroup::~ProcessGroup() { - for (std::set<Connection *>::iterator it = connections_.begin(); - it != connections_.end(); ++it) { - Connection *connection = *it; - connection->drain(); - delete connection; - } - - for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); - it != child_process_groups_.end(); ++it) { - ProcessGroup *processGroup(*it); - delete processGroup; - } - - for (std::set<Processor *>::iterator it = processors_.begin(); - it != processors_.end(); ++it) { - Processor *processor(*it); - delete processor; - } -} - -void ProcessGroup::getConnections(std::map<std::string, Connection*> *connectionMap) -{ - for (auto connection : connections_) - { - (*connectionMap)[connection->getUUIDStr()] = connection; - } - - for (auto processGroup: child_process_groups_) { - processGroup->getConnections(connectionMap); - } -} - -bool ProcessGroup::isRootProcessGroup() { - std::lock_guard<std::mutex> lock(mtx_); - return (type_ == ROOT_PROCESS_GROUP); -} - -void ProcessGroup::addProcessor(Processor *processor) { - std::lock_guard<std::mutex> lock(mtx_); - - if (processors_.find(processor) == processors_.end()) { - // We do not have the same processor in this process group yet - processors_.insert(processor); - logger_->log_info("Add processor %s into process group %s", - processor->getName().c_str(), name_.c_str()); - } -} - -void ProcessGroup::removeProcessor(Processor *processor) { - std::lock_guard<std::mutex> lock(mtx_); - - if (processors_.find(processor) != processors_.end()) { - // We do have the same processor in this process group yet - processors_.erase(processor); - logger_->log_info("Remove processor %s from process group %s", - processor->getName().c_str(), name_.c_str()); - } -} - -void ProcessGroup::addProcessGroup(ProcessGroup *child) { - std::lock_guard<std::mutex> lock(mtx_); - - if (child_process_groups_.find(child) == child_process_groups_.end()) { - // We do not have the same child process group in this process group yet - child_process_groups_.insert(child); - logger_->log_info("Add child process group %s into process group %s", - child->getName().c_str(), name_.c_str()); - } -} - -void ProcessGroup::removeProcessGroup(ProcessGroup *child) { - std::lock_guard<std::mutex> lock(mtx_); - - if (child_process_groups_.find(child) != child_process_groups_.end()) { - // We do have the same child process group in this process group yet - child_process_groups_.erase(child); - logger_->log_info("Remove child process group %s from process group %s", - child->getName().c_str(), name_.c_str()); - } -} - -void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard<std::mutex> lock(mtx_); - - try { - // Start all the processor node, input and output ports - for (auto processor : processors_) { - logger_->log_debug("Starting %s", processor->getName().c_str()); - - if (!processor->isRunning() - && processor->getScheduledState() != DISABLED) { - if (processor->getSchedulingStrategy() == TIMER_DRIVEN) - timeScheduler->schedule(processor); - else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) - eventScheduler->schedule(processor); - } - } - // Start processing the group - for (auto processGroup : child_process_groups_) { - processGroup->startProcessing(timeScheduler, eventScheduler); - } - } catch (std::exception &exception) { - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } catch (...) { - logger_->log_debug( - "Caught Exception during process group start processing"); - throw; - } -} - -void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard<std::mutex> lock(mtx_); - - try { - // Stop all the processor node, input and output ports - for (std::set<Processor *>::iterator it = processors_.begin(); - it != processors_.end(); ++it) { - Processor *processor(*it); - if (processor->getSchedulingStrategy() == TIMER_DRIVEN) - timeScheduler->unschedule(processor); - else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) - eventScheduler->unschedule(processor); - } - - for (std::set<ProcessGroup *>::iterator it = - child_process_groups_.begin(); it != child_process_groups_.end(); - ++it) { - ProcessGroup *processGroup(*it); - processGroup->stopProcessing(timeScheduler, eventScheduler); - } - } catch (std::exception &exception) { - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } catch (...) { - logger_->log_debug( - "Caught Exception during process group stop processing"); - throw; - } -} - -Processor *ProcessGroup::findProcessor(uuid_t uuid) { - - Processor *ret = NULL; - // std::lock_guard<std::mutex> lock(_mtx); - - for(auto processor : processors_){ - logger_->log_info("find processor %s", processor->getName().c_str()); - uuid_t processorUUID; - - if (processor->getUUID(processorUUID)) { - - char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0" - uuid_unparse_lower(processorUUID, uuid_str); - std::string processorUUIDstr = uuid_str; - uuid_unparse_lower(uuid, uuid_str); - std::string uuidStr = uuid_str; - if (processorUUIDstr == uuidStr) { - return processor; - } - } - - } - for(auto processGroup : child_process_groups_){ - - logger_->log_info("find processor child %s", - processGroup->getName().c_str()); - Processor *processor = processGroup->findProcessor(uuid); - if (processor) - return processor; - } - - return ret; -} - -Processor *ProcessGroup::findProcessor(std::string processorName) { - Processor *ret = NULL; - - for(auto processor : processors_){ - logger_->log_debug("Current processor is %s", - processor->getName().c_str()); - if (processor->getName() == processorName) - return processor; - } - - for(auto processGroup : child_process_groups_){ - Processor *processor = processGroup->findProcessor(processorName); - if (processor) - return processor; - } - - return ret; -} - -void ProcessGroup::updatePropertyValue(std::string processorName, - std::string propertyName, std::string propertyValue) { - std::lock_guard<std::mutex> lock(mtx_); - - for(auto processor : processors_){ - if (processor->getName() == processorName) { - processor->setProperty(propertyName, propertyValue); - } - } - - for(auto processGroup : child_process_groups_){ - processGroup->updatePropertyValue(processorName, propertyName, - propertyValue); - } - - return; -} - -void ProcessGroup::addConnection(Connection *connection) { - std::lock_guard<std::mutex> lock(mtx_); - - if (connections_.find(connection) == connections_.end()) { - // We do not have the same connection in this process group yet - connections_.insert(connection); - logger_->log_info("Add connection %s into process group %s", - connection->getName().c_str(), name_.c_str()); - uuid_t sourceUUID; - Processor *source = NULL; - connection->getSourceProcessorUUID(sourceUUID); - source = this->findProcessor(sourceUUID); - if (source) - source->addConnection(connection); - Processor *destination = NULL; - uuid_t destinationUUID; - connection->getDestinationProcessorUUID(destinationUUID); - destination = this->findProcessor(destinationUUID); - if (destination && destination != source) - destination->addConnection(connection); - } -} - -void ProcessGroup::removeConnection(Connection *connection) { - std::lock_guard<std::mutex> lock(mtx_); - - if (connections_.find(connection) != connections_.end()) { - // We do not have the same connection in this process group yet - connections_.erase(connection); - logger_->log_info("Remove connection %s into process group %s", - connection->getName().c_str(), name_.c_str()); - uuid_t sourceUUID; - Processor *source = NULL; - connection->getSourceProcessorUUID(sourceUUID); - source = this->findProcessor(sourceUUID); - if (source) - source->removeConnection(connection); - Processor *destination = NULL; - uuid_t destinationUUID; - connection->getDestinationProcessorUUID(destinationUUID); - destination = this->findProcessor(destinationUUID); - if (destination && destination != source) - destination->removeConnection(connection); - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp deleted file mode 100644 index 3b3eb64..0000000 --- a/libminifi/src/ProcessSession.cpp +++ /dev/null @@ -1,790 +0,0 @@ -/** - * @file ProcessSession.cpp - * ProcessSession class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <iostream> - -#include "ProcessSession.h" -#include "FlowController.h" - -ProcessSession::ProcessSession(ProcessContext *processContext) : _processContext(processContext) { - logger_ = Logger::getLogger(); - logger_->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str()); - _provenanceReport = NULL; - if (FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable()) - { - _provenanceReport = new ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(), - _processContext->getProcessor()->getName()); - } -} - -FlowFileRecord* ProcessSession::create() -{ - std::map<std::string, std::string> empty; - FlowFileRecord *record = new FlowFileRecord(empty); - - if (record) - { - _addedFlowFiles[record->getUUIDStr()] = record; - logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); - std::string details = _processContext->getProcessor()->getName() + " creates flow record " + record->getUUIDStr(); - if (_provenanceReport) - _provenanceReport->create(record, details); - } - - return record; -} - -FlowFileRecord* ProcessSession::create(FlowFileRecord *parent) -{ - std::map<std::string, std::string> empty; - FlowFileRecord *record = new FlowFileRecord(empty); - - if (record) - { - _addedFlowFiles[record->getUUIDStr()] = record; - logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); - } - - if (record) - { - // Copy attributes - std::map<std::string, std::string> parentAttributes = parent->getAttributes(); - std::map<std::string, std::string>::iterator it; - for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++) - { - if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || - it->first == FlowAttributeKey(DISCARD_REASON) || - it->first == FlowAttributeKey(UUID)) - // Do not copy special attributes from parent - continue; - record->setAttribute(it->first, it->second); - } - record->_lineageStartDate = parent->_lineageStartDate; - record->_lineageIdentifiers = parent->_lineageIdentifiers; - record->_lineageIdentifiers.insert(parent->_uuidStr); - - } - return record; -} - -FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent) -{ - FlowFileRecord *record = this->create(parent); - if (record) - { - // Copy Resource Claim - record->_claim = parent->_claim; - if (record->_claim) - { - record->_offset = parent->_offset; - record->_size = parent->_size; - record->_claim->increaseFlowFileRecordOwnedCount(); - } - if (_provenanceReport) - _provenanceReport->clone(parent, record); - } - return record; -} - -FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent) -{ - std::map<std::string, std::string> empty; - FlowFileRecord *record = new FlowFileRecord(empty); - - if (record) - { - this->_clonedFlowFiles[record->getUUIDStr()] = record; - logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str()); - // Copy attributes - std::map<std::string, std::string> parentAttributes = parent->getAttributes(); - std::map<std::string, std::string>::iterator it; - for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++) - { - if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || - it->first == FlowAttributeKey(DISCARD_REASON) || - it->first == FlowAttributeKey(UUID)) - // Do not copy special attributes from parent - continue; - record->setAttribute(it->first, it->second); - } - record->_lineageStartDate = parent->_lineageStartDate; - record->_lineageIdentifiers = parent->_lineageIdentifiers; - record->_lineageIdentifiers.insert(parent->_uuidStr); - - // Copy Resource Claim - record->_claim = parent->_claim; - if (record->_claim) - { - record->_offset = parent->_offset; - record->_size = parent->_size; - record->_claim->increaseFlowFileRecordOwnedCount(); - } - if (_provenanceReport) - _provenanceReport->clone(parent, record); - } - - return record; -} - -FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size) -{ - FlowFileRecord *record = this->create(parent); - if (record) - { - if (parent->_claim) - { - if ((offset + size) > (long) parent->_size) - { - // Set offset and size - logger_->log_error("clone offset %d and size %d exceed parent size %d", - offset, size, parent->_size); - // Remove the Add FlowFile for the session - std::map<std::string, FlowFileRecord *>::iterator it = - this->_addedFlowFiles.find(record->getUUIDStr()); - if (it != this->_addedFlowFiles.end()) - this->_addedFlowFiles.erase(record->getUUIDStr()); - delete record; - return NULL; - } - record->_offset = parent->_offset + parent->_offset; - record->_size = size; - // Copy Resource Claim - record->_claim = parent->_claim; - record->_claim->increaseFlowFileRecordOwnedCount(); - } - if (_provenanceReport) - _provenanceReport->clone(parent, record); - } - return record; -} - -void ProcessSession::remove(FlowFileRecord *flow) -{ - flow->_markedDelete = true; - _deletedFlowFiles[flow->getUUIDStr()] = flow; - std::string reason = _processContext->getProcessor()->getName() + " drop flow record " + flow->getUUIDStr(); - if (_provenanceReport) - _provenanceReport->drop(flow, reason); -} - -void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value) -{ - flow->setAttribute(key, value); - std::string details = _processContext->getProcessor()->getName() + " modify flow record " + flow->getUUIDStr() + - " attribute " + key + ":" + value; - if (_provenanceReport) - _provenanceReport->modifyAttributes(flow, details); -} - -void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key) -{ - flow->removeAttribute(key); - std::string details = _processContext->getProcessor()->getName() + " remove flow record " + flow->getUUIDStr() + - " attribute " + key; - if (_provenanceReport) - _provenanceReport->modifyAttributes(flow, details); -} - -void ProcessSession::penalize(FlowFileRecord *flow) -{ - flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec(); -} - -void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship) -{ - _transferRelationship[flow->getUUIDStr()] = relationship; -} - -void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback) -{ - ResourceClaim *claim = NULL; - - claim = new ResourceClaim(); - - try - { - std::ofstream fs; - uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - if (fs.is_open()) - { - // Call the callback to write the content - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) - { - flow->_size = fs.tellp(); - flow->_offset = 0; - if (flow->_claim) - { - // Remove the old claim - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - flow->_claim = claim; - claim->increaseFlowFileRecordOwnedCount(); - /* - logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - std::string details = _processContext->getProcessor()->getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - if (_provenanceReport) - _provenanceReport->modifyContent(flow, details, endTime - startTime); - } - else - { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } - } - catch (std::exception &exception) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - logger_->log_debug("Caught Exception during process session write"); - throw; - } -} - -void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback) -{ - ResourceClaim *claim = NULL; - - if (flow->_claim == NULL) - { - // No existed claim for append, we need to create new claim - return write(flow, callback); - } - - claim = flow->_claim; - - try - { - std::ofstream fs; - uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); - if (fs.is_open()) - { - // Call the callback to write the content - std::streampos oldPos = fs.tellp(); - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) - { - uint64_t appendSize = fs.tellp() - oldPos; - flow->_size += appendSize; - /* - logger_->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", - flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - std::string details = _processContext->getProcessor()->getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - if (_provenanceReport) - _provenanceReport->modifyContent(flow, details, endTime - startTime); - } - else - { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } - } - catch (std::exception &exception) - { - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - logger_->log_debug("Caught Exception during process session append"); - throw; - } -} - -void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback) -{ - try - { - ResourceClaim *claim = NULL; - if (flow->_claim == NULL) - { - // No existed claim for read, we throw exception - throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read"); - } - - claim = flow->_claim; - std::ifstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); - if (fs.is_open()) - { - fs.seekg(flow->_offset, fs.beg); - - if (fs.good()) - { - callback->process(&fs); - /* - logger_->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", - flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - } - else - { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } - } - catch (std::exception &exception) - { - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - logger_->log_debug("Caught Exception during process session read"); - throw; - } -} - -void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset) -{ - ResourceClaim *claim = NULL; - - claim = new ResourceClaim(); - char *buf = NULL; - int size = 4096; - buf = new char [size]; - - try - { - std::ofstream fs; - uint64_t startTime = getTimeMillis(); - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - std::ifstream input; - input.open(source.c_str(), std::fstream::in | std::fstream::binary); - - if (fs.is_open() && input.is_open()) - { - // Open the source file and stream to the flow file - input.seekg(offset, fs.beg); - while (input.good()) - { - input.read(buf, size); - if (input) - fs.write(buf, size); - else - fs.write(buf, input.gcount()); - } - - if (fs.good() && fs.tellp() >= 0) - { - flow->_size = fs.tellp(); - flow->_offset = 0; - if (flow->_claim) - { - // Remove the old claim - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - flow->_claim = claim; - claim->increaseFlowFileRecordOwnedCount(); - - logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); - - fs.close(); - input.close(); - if (!keepSource) - std::remove(source.c_str()); - std::string details = _processContext->getProcessor()->getName() + " modify flow record content " + flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); - if (_provenanceReport) - _provenanceReport->modifyContent(flow, details, endTime - startTime); - } - else - { - fs.close(); - input.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); - } - - delete[] buf; - } - catch (std::exception &exception) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - logger_->log_debug("Caught Exception %s", exception.what()); - delete[] buf; - throw; - } - catch (...) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - logger_->log_debug("Caught Exception during process session write"); - delete[] buf; - throw; - } -} - -void ProcessSession::commit() -{ - try - { - // First we clone the flow record based on the transfered relationship for updated flow record - for (auto && it : _updatedFlowFiles) - { - FlowFileRecord *record = it.second; - if (record->_markedDelete) - continue; - std::map<std::string, Relationship>::iterator itRelationship = - this->_transferRelationship.find(record->getUUIDStr()); - if (itRelationship != _transferRelationship.end()) - { - Relationship relationship = itRelationship->second; - // Find the relationship, we need to find the connections for that relationship - std::set<Connection *> connections = - _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); - if (connections.empty()) - { - // No connection - if (!_processContext->getProcessor()->isAutoTerminated(relationship)) - { - // Not autoterminate, we should have the connect - std::string message = "Connect empty for non auto terminated relationship" + relationship.getName(); - throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); - } - else - { - // Autoterminated - remove(record); - } - } - else - { - // We connections, clone the flow and assign the connection accordingly - for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) - { - Connection *connection(*itConnection); - if (itConnection == connections.begin()) - { - // First connection which the flow need be routed to - record->_connection = connection; - } - else - { - // Clone the flow file and route to the connection - FlowFileRecord *cloneRecord; - cloneRecord = this->cloneDuringTransfer(record); - if (cloneRecord) - cloneRecord->_connection = connection; - else - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); - } - } - } - } - else - { - // Can not find relationship for the flow - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); - } - } - // Do the samething for added flow file - for(const auto it : _addedFlowFiles) - { - FlowFileRecord *record = it.second; - if (record->_markedDelete) - continue; - std::map<std::string, Relationship>::iterator itRelationship = - this->_transferRelationship.find(record->getUUIDStr()); - if (itRelationship != _transferRelationship.end()) - { - Relationship relationship = itRelationship->second; - // Find the relationship, we need to find the connections for that relationship - std::set<Connection *> connections = - _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); - if (connections.empty()) - { - // No connection - if (!_processContext->getProcessor()->isAutoTerminated(relationship)) - { - // Not autoterminate, we should have the connect - std::string message = "Connect empty for non auto terminated relationship " + relationship.getName(); - throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); - } - else - { - // Autoterminated - remove(record); - } - } - else - { - // We connections, clone the flow and assign the connection accordingly - for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) - { - Connection *connection(*itConnection); - if (itConnection == connections.begin()) - { - // First connection which the flow need be routed to - record->_connection = connection; - } - else - { - // Clone the flow file and route to the connection - FlowFileRecord *cloneRecord; - cloneRecord = this->cloneDuringTransfer(record); - if (cloneRecord) - cloneRecord->_connection = connection; - else - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); - } - } - } - } - else - { - // Can not find relationship for the flow - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); - } - } - // Complete process the added and update flow files for the session, send the flow file to its queue - for(const auto &it : _updatedFlowFiles) - { - FlowFileRecord *record = it.second; - if (record->_markedDelete) - { - continue; - } - if (record->_connection) - record->_connection->put(record); - else - delete record; - } - for(const auto &it : _addedFlowFiles) - { - FlowFileRecord *record = it.second; - if (record->_markedDelete) - { - continue; - } - if (record->_connection) - record->_connection->put(record); - else - delete record; - } - // Process the clone flow files - for(const auto &it : _clonedFlowFiles) - { - FlowFileRecord *record = it.second; - if (record->_markedDelete) - { - continue; - } - if (record->_connection) - record->_connection->put(record); - else - delete record; - } - // Delete the deleted flow files - for(const auto &it : _deletedFlowFiles) - { - FlowFileRecord *record = it.second; - delete record; - } - // Delete the snapshot - for(const auto &it : _originalFlowFiles) - { - FlowFileRecord *record = it.second; - delete record; - } - // All done - _updatedFlowFiles.clear(); - _addedFlowFiles.clear(); - _clonedFlowFiles.clear(); - _deletedFlowFiles.clear(); - _originalFlowFiles.clear(); - // persistent the provenance report - if (this->_provenanceReport) - this->_provenanceReport->commit(); - logger_->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str()); - } - catch (std::exception &exception) - { - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - logger_->log_debug("Caught Exception during process session commit"); - throw; - } -} - - -void ProcessSession::rollback() -{ - try - { - // Requeue the snapshot of the flowfile back - for(const auto &it : _originalFlowFiles) - { - FlowFileRecord *record = it.second; - if (record->_orginalConnection) - { - record->_snapshot = false; - record->_orginalConnection->put(record); - } - else - delete record; - } - _originalFlowFiles.clear(); - // Process the clone flow files - for(const auto &it : _clonedFlowFiles) - { - FlowFileRecord *record = it.second; - delete record; - } - _clonedFlowFiles.clear(); - for(const auto &it : _addedFlowFiles) - { - FlowFileRecord *record = it.second; - delete record; - } - _addedFlowFiles.clear(); - for(const auto &it : _updatedFlowFiles) - { - FlowFileRecord *record = it.second; - delete record; - } - _updatedFlowFiles.clear(); - _deletedFlowFiles.clear(); - logger_->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str()); - } - catch (std::exception &exception) - { - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - logger_->log_debug("Caught Exception during process session roll back"); - throw; - } -} - -FlowFileRecord *ProcessSession::get() -{ - Connection *first = _processContext->getProcessor()->getNextIncomingConnection(); - - if (first == NULL) - return NULL; - - Connection *current = first; - - do - { - std::set<FlowFileRecord *> expired; - FlowFileRecord *ret = current->poll(expired); - if (expired.size() > 0) - { - // Remove expired flow record - for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it) - { - FlowFileRecord *record = *it; - std::string details = _processContext->getProcessor()->getName() + " expire flow record " + record->getUUIDStr(); - if (_provenanceReport) - _provenanceReport->expire(record, details); - delete (record); - } - } - if (ret) - { - // add the flow record to the current process session update map - ret->_markedDelete = false; - _updatedFlowFiles[ret->getUUIDStr()] = ret; - std::map<std::string, std::string> empty; - FlowFileRecord *snapshot = new FlowFileRecord(empty); - logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str()); - snapshot->duplicate(ret); - // save a snapshot - _originalFlowFiles[snapshot->getUUIDStr()] = snapshot; - return ret; - } - current = _processContext->getProcessor()->getNextIncomingConnection(); - } - while (current != NULL && current != first); - - return NULL; -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessSessionFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessSessionFactory.cpp b/libminifi/src/ProcessSessionFactory.cpp deleted file mode 100644 index a105b1c..0000000 --- a/libminifi/src/ProcessSessionFactory.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/** - * @file ProcessSessionFactory.cpp - * ProcessSessionFactory class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ProcessSessionFactory.h" - -#include <memory> - -std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() -{ - return std::unique_ptr<ProcessSession>(new ProcessSession(_processContext)); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp deleted file mode 100644 index 94aaa20..0000000 --- a/libminifi/src/Processor.cpp +++ /dev/null @@ -1,526 +0,0 @@ -/** - * @file Processor.cpp - * Processor class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <memory> -#include <functional> - -#include "Processor.h" -#include "ProcessContext.h" -#include "ProcessSession.h" -#include "ProcessSessionFactory.h" - -Processor::Processor(std::string name, uuid_t uuid) -: _name(name) -{ - if (!uuid) - // Generate the global UUID for the flow record - uuid_generate(_uuid); - else - uuid_copy(_uuid, uuid); - - char uuidStr[37]; - uuid_unparse_lower(_uuid, uuidStr); - _uuidStr = uuidStr; - _hasWork.store(false); - // Setup the default values - _state = DISABLED; - _strategy = TIMER_DRIVEN; - _lossTolerant = false; - _triggerWhenEmpty = false; - _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS; - _runDurantionNano = 0; - _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000; - _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000; - _maxConcurrentTasks = 1; - _activeTasks = 0; - _yieldExpiration = 0; - _incomingConnectionsIter = this->_incomingConnections.begin(); - logger_ = Logger::getLogger(); - logger_->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str()); -} - -Processor::~Processor() -{ - -} - -bool Processor::isRunning() -{ - return (_state == RUNNING && _activeTasks > 0); -} - -void Processor::setScheduledState(ScheduledState state) -{ - _state = state; -} - -bool Processor::setSupportedProperties(std::set<Property> properties) -{ - if (isRunning()) - { - logger_->log_info("Can not set processor property while the process %s is running", - _name.c_str()); - return false; - } - - std::lock_guard<std::mutex> lock(_mtx); - - _properties.clear(); - for (auto item : properties) - { - _properties[item.getName()] = item; - logger_->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str()); - } - - return true; -} - -bool Processor::setSupportedRelationships(std::set<Relationship> relationships) -{ - if (isRunning()) - { - logger_->log_info("Can not set processor supported relationship while the process %s is running", - _name.c_str()); - return false; - } - - std::lock_guard<std::mutex> lock(_mtx); - - _relationships.clear(); - for(auto item : relationships) - { - _relationships[item.getName()] = item; - logger_->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str()); - } - - return true; -} - -bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships) -{ - if (isRunning()) - { - logger_->log_info("Can not set processor auto terminated relationship while the process %s is running", - _name.c_str()); - return false; - } - - std::lock_guard<std::mutex> lock(_mtx); - - _autoTerminatedRelationships.clear(); - for(auto item : relationships) - { - _autoTerminatedRelationships[item.getName()] = item; - logger_->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str()); - } - - return true; -} - -bool Processor::isAutoTerminated(Relationship relationship) -{ - bool isRun = isRunning(); - - auto conditionalLock = !isRun ? - std::unique_lock<std::mutex>() - : std::unique_lock<std::mutex>(_mtx); - - const auto &it = _autoTerminatedRelationships.find(relationship.getName()); - if (it != _autoTerminatedRelationships.end()) - { - return true; - } - else - { - return false; - } -} - -bool Processor::isSupportedRelationship(Relationship relationship) -{ - bool isRun = isRunning(); - - auto conditionalLock = !isRun ? - std::unique_lock<std::mutex>() - : std::unique_lock<std::mutex>(_mtx); - - const auto &it = _relationships.find(relationship.getName()); - if (it != _relationships.end()) - { - return true; - } - else - { - return false; - } -} - -bool Processor::getProperty(std::string name, std::string &value) -{ - bool isRun = isRunning(); - - - auto conditionalLock = !isRun ? - std::unique_lock<std::mutex>() - : std::unique_lock<std::mutex>(_mtx); - - const auto &it = _properties.find(name); - if (it != _properties.end()) - { - Property item = it->second; - value = item.getValue(); - return true; - } - else - { - return false; - } -} - -bool Processor::setProperty(std::string name, std::string value) -{ - - std::lock_guard<std::mutex> lock(_mtx); - auto &&it = _properties.find(name); - - if (it != _properties.end()) - { - Property item = it->second; - item.setValue(value); - _properties[item.getName()] = item; - logger_->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str()); - return true; - } - else - { - return false; - } -} - -bool Processor::setProperty(Property prop, std::string value) { - - std::lock_guard<std::mutex> lock(_mtx); - auto it = _properties.find( - prop.getName()); - - if (it != _properties.end()) { - Property item = it->second; - item.setValue(value); - _properties[item.getName()] = item; - logger_->log_info("Processor %s property name %s value %s", - _name.c_str(), item.getName().c_str(), value.c_str()); - return true; - } else { - Property newProp(prop); - newProp.setValue(value); - _properties.insert( - std::pair<std::string, Property>(prop.getName(), newProp)); - return true; - - return false; - } -} - -std::set<Connection *> Processor::getOutGoingConnections(std::string relationship) -{ - std::set<Connection *> empty; - - auto &&it = _outGoingConnections.find(relationship); - if (it != _outGoingConnections.end()) - { - return _outGoingConnections[relationship]; - } - else - { - return empty; - } -} - -bool Processor::addConnection(Connection *connection) -{ - bool ret = false; - - - if (isRunning()) - { - logger_->log_info("Can not add connection while the process %s is running", - _name.c_str()); - return false; - } - - - std::lock_guard<std::mutex> lock(_mtx); - - uuid_t srcUUID; - uuid_t destUUID; - - connection->getSourceProcessorUUID(srcUUID); - connection->getDestinationProcessorUUID(destUUID); - char uuid_str[37]; - - - uuid_unparse_lower(_uuid, uuid_str); - std::string my_uuid = uuid_str; - uuid_unparse_lower(destUUID, uuid_str); - std::string destination_uuid = uuid_str; - if (my_uuid == destination_uuid) - { - // Connection is destination to the current processor - if (_incomingConnections.find(connection) == _incomingConnections.end()) - { - _incomingConnections.insert(connection); - connection->setDestinationProcessor(this); - logger_->log_info("Add connection %s into Processor %s incoming connection", - connection->getName().c_str(), _name.c_str()); - _incomingConnectionsIter = this->_incomingConnections.begin(); - ret = true; - } - } - uuid_unparse_lower(srcUUID, uuid_str); - std::string source_uuid = uuid_str; - if (my_uuid == source_uuid) - { - std::string relationship = connection->getRelationship().getName(); - // Connection is source from the current processor - auto &&it = - _outGoingConnections.find(relationship); - if (it != _outGoingConnections.end()) - { - // We already has connection for this relationship - std::set<Connection *> existedConnection = it->second; - if (existedConnection.find(connection) == existedConnection.end()) - { - // We do not have the same connection for this relationship yet - existedConnection.insert(connection); - connection->setSourceProcessor(this); - _outGoingConnections[relationship] = existedConnection; - logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), _name.c_str(), relationship.c_str()); - ret = true; - } - } - else - { - - // We do not have any outgoing connection for this relationship yet - std::set<Connection *> newConnection; - newConnection.insert(connection); - connection->setSourceProcessor(this); - _outGoingConnections[relationship] = newConnection; - logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), _name.c_str(), relationship.c_str()); - ret = true; - } - } - - - return ret; -} - -void Processor::removeConnection(Connection *connection) -{ - if (isRunning()) - { - logger_->log_info("Can not remove connection while the process %s is running", - _name.c_str()); - return; - } - - std::lock_guard<std::mutex> lock(_mtx); - - uuid_t srcUUID; - uuid_t destUUID; - - connection->getSourceProcessorUUID(srcUUID); - connection->getDestinationProcessorUUID(destUUID); - - if (uuid_compare(_uuid, destUUID) == 0) - { - // Connection is destination to the current processor - if (_incomingConnections.find(connection) != _incomingConnections.end()) - { - _incomingConnections.erase(connection); - connection->setDestinationProcessor(NULL); - logger_->log_info("Remove connection %s into Processor %s incoming connection", - connection->getName().c_str(), _name.c_str()); - _incomingConnectionsIter = this->_incomingConnections.begin(); - } - } - - if (uuid_compare(_uuid, srcUUID) == 0) - { - std::string relationship = connection->getRelationship().getName(); - // Connection is source from the current processor - auto &&it = - _outGoingConnections.find(relationship); - if (it == _outGoingConnections.end()) - { - return; - } - else - { - if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end()) - { - _outGoingConnections[relationship].erase(connection); - connection->setSourceProcessor(NULL); - logger_->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), _name.c_str(), relationship.c_str()); - } - } - } -} - -Connection *Processor::getNextIncomingConnection() -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_incomingConnections.size() == 0) - return NULL; - - if (_incomingConnectionsIter == _incomingConnections.end()) - _incomingConnectionsIter = _incomingConnections.begin(); - - Connection *ret = *_incomingConnectionsIter; - _incomingConnectionsIter++; - - if (_incomingConnectionsIter == _incomingConnections.end()) - _incomingConnectionsIter = _incomingConnections.begin(); - - return ret; -} - -bool Processor::flowFilesQueued() -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_incomingConnections.size() == 0) - return false; - - for(auto &&connection : _incomingConnections) - { - if (connection->getQueueSize() > 0) - return true; - } - - return false; -} - -bool Processor::flowFilesOutGoingFull() -{ - std::lock_guard<std::mutex> lock(_mtx); - - for(auto &&connection : _outGoingConnections) - { - // We already has connection for this relationship - std::set<Connection *> existedConnection = connection.second; - for(const auto connection : existedConnection) - { - if (connection->isFull()) - return true; - } - } - - return false; -} - -void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory) -{ - auto session = sessionFactory->createSession(); - - try { - // Call the virtual trigger function - onTrigger(context, session.get()); - session->commit(); - } - catch (std::exception &exception) - { - logger_->log_debug("Caught Exception %s", exception.what()); - session->rollback(); - throw; - } - catch (...) - { - logger_->log_debug("Caught Exception Processor::onTrigger"); - session->rollback(); - throw; - } -} - -void Processor::waitForWork(uint64_t timeoutMs) -{ - _hasWork.store( isWorkAvailable() ); - - if (!_hasWork.load()) - { - std::unique_lock<std::mutex> lock(_workAvailableMtx); - _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork.load(); }); - } - -} - -void Processor::notifyWork() -{ - // Do nothing if we are not event-driven - if (_strategy != EVENT_DRIVEN) - { - return; - } - - { - _hasWork.store( isWorkAvailable() ); - - - if (_hasWork.load()) - { - _hasWorkCondition.notify_one(); - } - } -} - -bool Processor::isWorkAvailable() -{ - // We have work if any incoming connection has work - bool hasWork = false; - - try - { - for (const auto &conn : getIncomingConnections()) - { - if (conn->getQueueSize() > 0) - { - hasWork = true; - break; - } - } - } - catch (...) - { - logger_->log_error("Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!"); - } - - return hasWork; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp deleted file mode 100644 index 58cf730..0000000 --- a/libminifi/src/Provenance.cpp +++ /dev/null @@ -1,566 +0,0 @@ -/** - * @file Provenance.cpp - * Provenance implemenatation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <cstdint> -#include <vector> -#include <arpa/inet.h> -#include "io/DataStream.h" -#include "io/Serializable.h" -#include "Provenance.h" -#include "Relationship.h" -#include "Logger.h" -#include "FlowController.h" - -//! DeSerialize -bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo, - std::string key) { - std::string value; - bool ret; - - ret = repo->Get(key, value); - - if (!ret) { - logger_->log_error("NiFi Provenance Store event %s can not found", - key.c_str()); - return false; - } else - logger_->log_debug("NiFi Provenance Read event %s length %d", - key.c_str(), value.length()); - - - DataStream stream((const uint8_t*)value.data(),value.length()); - - ret = DeSerialize(stream); - - if (ret) { - logger_->log_debug( - "NiFi Provenance retrieve event %s size %d eventType %d success", - _eventIdStr.c_str(), stream.getSize(), _eventType); - } else { - logger_->log_debug( - "NiFi Provenance retrieve event %s size %d eventType %d fail", - _eventIdStr.c_str(), stream.getSize(), _eventType); - } - - return ret; -} - -bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) { - - DataStream outStream; - - int ret; - - ret = writeUTF(this->_eventIdStr,&outStream); - if (ret <= 0) { - - return false; - } - - uint32_t eventType = this->_eventType; - ret = write(eventType,&outStream); - if (ret != 4) { - - return false; - } - - ret = write(this->_eventTime,&outStream); - if (ret != 8) { - - return false; - } - - ret = write(this->_entryDate,&outStream); - if (ret != 8) { - return false; - } - - ret = write(this->_eventDuration,&outStream); - if (ret != 8) { - - return false; - } - - ret = write(this->_lineageStartDate,&outStream); - if (ret != 8) { - - return false; - } - - ret = writeUTF(this->_componentId,&outStream); - if (ret <= 0) { - - return false; - } - - ret = writeUTF(this->_componentType,&outStream); - if (ret <= 0) { - - return false; - } - - ret = writeUTF(this->_uuid,&outStream); - if (ret <= 0) { - - return false; - } - - ret = writeUTF(this->_details,&outStream); - if (ret <= 0) { - - return false; - } - - // write flow attributes - uint32_t numAttributes = this->_attributes.size(); - ret = write(numAttributes,&outStream); - if (ret != 4) { - - return false; - } - - for (auto itAttribute : _attributes) { - ret = writeUTF(itAttribute.first,&outStream, true); - if (ret <= 0) { - - return false; - } - ret = writeUTF(itAttribute.second,&outStream, true); - if (ret <= 0) { - - return false; - } - } - - ret = writeUTF(this->_contentFullPath,&outStream); - if (ret <= 0) { - - return false; - } - - ret = write(this->_size,&outStream); - if (ret != 8) { - - return false; - } - - ret = write(this->_offset,&outStream); - if (ret != 8) { - - return false; - } - - ret = writeUTF(this->_sourceQueueIdentifier,&outStream); - if (ret <= 0) { - - return false; - } - - if (this->_eventType == ProvenanceEventRecord::FORK - || this->_eventType == ProvenanceEventRecord::CLONE - || this->_eventType == ProvenanceEventRecord::JOIN) { - // write UUIDs - uint32_t number = this->_parentUuids.size(); - ret = write(number,&outStream); - if (ret != 4) { - - return false; - } - for (auto parentUUID : _parentUuids) { - ret = writeUTF(parentUUID,&outStream); - if (ret <= 0) { - - return false; - } - } - number = this->_childrenUuids.size(); - ret = write(number,&outStream); - if (ret != 4) { - return false; - } - for (auto childUUID : _childrenUuids) { - ret = writeUTF(childUUID,&outStream); - if (ret <= 0) { - - return false; - } - } - } else if (this->_eventType == ProvenanceEventRecord::SEND - || this->_eventType == ProvenanceEventRecord::FETCH) { - ret = writeUTF(this->_transitUri,&outStream); - if (ret <= 0) { - - return false; - } - } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { - ret = writeUTF(this->_transitUri,&outStream); - if (ret <= 0) { - - return false; - } - ret = writeUTF(this->_sourceSystemFlowFileIdentifier,&outStream); - if (ret <= 0) { - - return false; - } - } - - // Persistent to the DB - - if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { - logger_->log_debug("NiFi Provenance Store event %s size %d success", - _eventIdStr.c_str(), outStream.getSize()); - return true; - } else { - logger_->log_error("NiFi Provenance Store event %s size %d fail", - _eventIdStr.c_str(), outStream.getSize()); - return false; - } - - // cleanup - - return true; -} - -bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { - - int ret; - - DataStream outStream(buffer,bufferSize); - - ret = readUTF(this->_eventIdStr,&outStream); - - if (ret <= 0) { - return false; - } - - uint32_t eventType; - ret = read(eventType,&outStream); - if (ret != 4) { - return false; - } - this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType; - - ret = read(this->_eventTime,&outStream); - if (ret != 8) { - return false; - } - - ret = read(this->_entryDate,&outStream); - if (ret != 8) { - return false; - } - - ret = read(this->_eventDuration,&outStream); - if (ret != 8) { - return false; - } - - ret = read(this->_lineageStartDate,&outStream); - if (ret != 8) { - return false; - } - - ret = readUTF(this->_componentId,&outStream); - if (ret <= 0) { - return false; - } - - ret = readUTF(this->_componentType,&outStream); - if (ret <= 0) { - return false; - } - - ret = readUTF(this->_uuid,&outStream); - if (ret <= 0) { - return false; - } - - ret = readUTF(this->_details,&outStream); - - if (ret <= 0) { - return false; - } - - // read flow attributes - uint32_t numAttributes = 0; - ret = read(numAttributes,&outStream); - if (ret != 4) { - return false; - } - - for (uint32_t i = 0; i < numAttributes; i++) { - std::string key; - ret = readUTF(key,&outStream, true); - if (ret <= 0) { - return false; - } - std::string value; - ret = readUTF(value,&outStream, true); - if (ret <= 0) { - return false; - } - this->_attributes[key] = value; - } - - ret = readUTF(this->_contentFullPath,&outStream); - if (ret <= 0) { - return false; - } - - ret = read(this->_size,&outStream); - if (ret != 8) { - return false; - } - - ret = read(this->_offset,&outStream); - if (ret != 8) { - return false; - } - - ret = readUTF(this->_sourceQueueIdentifier,&outStream); - if (ret <= 0) { - return false; - } - - if (this->_eventType == ProvenanceEventRecord::FORK - || this->_eventType == ProvenanceEventRecord::CLONE - || this->_eventType == ProvenanceEventRecord::JOIN) { - // read UUIDs - uint32_t number = 0; - ret = read(number,&outStream); - if (ret != 4) { - return false; - } - - - for (uint32_t i = 0; i < number; i++) { - std::string parentUUID; - ret = readUTF(parentUUID,&outStream); - if (ret <= 0) { - return false; - } - this->addParentUuid(parentUUID); - } - number = 0; - ret = read(number,&outStream); - if (ret != 4) { - return false; - } - for (uint32_t i = 0; i < number; i++) { - std::string childUUID; - ret = readUTF(childUUID,&outStream); - if (ret <= 0) { - return false; - } - this->addChildUuid(childUUID); - } - } else if (this->_eventType == ProvenanceEventRecord::SEND - || this->_eventType == ProvenanceEventRecord::FETCH) { - ret = readUTF(this->_transitUri,&outStream); - if (ret <= 0) { - return false; - } - } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { - ret = readUTF(this->_transitUri,&outStream); - if (ret <= 0) { - return false; - } - ret = readUTF(this->_sourceSystemFlowFileIdentifier,&outStream); - if (ret <= 0) { - return false; - } - } - - return true; -} - -void ProvenanceReporter::commit() { - if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable()) - return; - for (auto event : _events) { - if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) { - event->Serialize( - FlowControllerFactory::getFlowController()->getProvenanceRepository()); - } else { - logger_->log_debug("Provenance Repository is full"); - } - } -} - -void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, - flow); - - if (event) { - event->setDetails(detail); - add(event); - } -} - -void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, - std::string detail, uint64_t processingDuration) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow); - - if (event) { - event->setDetails(detail); - event->setRelationship(relation.getName()); - event->setEventDuration(processingDuration); - add(event); - } -} - -void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow, - std::string detail) { - ProvenanceEventRecord *event = allocate( - ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); - - if (event) { - event->setDetails(detail); - add(event); - } -} - -void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string detail, - uint64_t processingDuration) { - ProvenanceEventRecord *event = allocate( - ProvenanceEventRecord::CONTENT_MODIFIED, flow); - - if (event) { - event->setDetails(detail); - event->setEventDuration(processingDuration); - add(event); - } -} - -void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, - parent); - - if (event) { - event->addChildFlowFile(child); - event->addParentFlowFile(parent); - add(event); - } -} - -void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents, - FlowFileRecord *child, std::string detail, - uint64_t processingDuration) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child); - - if (event) { - event->addChildFlowFile(child); - std::vector<FlowFileRecord *>::iterator it; - for (it = parents.begin(); it != parents.end(); it++) { - FlowFileRecord *record = *it; - event->addParentFlowFile(record); - } - event->setDetails(detail); - event->setEventDuration(processingDuration); - add(event); - } -} - -void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child, - FlowFileRecord *parent, std::string detail, - uint64_t processingDuration) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, - parent); - - if (event) { - event->addParentFlowFile(parent); - std::vector<FlowFileRecord *>::iterator it; - for (it = child.begin(); it != child.end(); it++) { - FlowFileRecord *record = *it; - event->addChildFlowFile(record); - } - event->setDetails(detail); - event->setEventDuration(processingDuration); - add(event); - } -} - -void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, - flow); - - if (event) { - event->setDetails(detail); - add(event); - } -} - -void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow); - - if (event) { - std::string dropReason = "Discard reason: " + reason; - event->setDetails(dropReason); - add(event); - } -} - -void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri, - std::string detail, uint64_t processingDuration, bool force) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow); - - if (event) { - event->setTransitUri(transitUri); - event->setDetails(detail); - event->setEventDuration(processingDuration); - if (!force) { - add(event); - } else { - if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) - event->Serialize( - FlowControllerFactory::getFlowController()->getProvenanceRepository()); - delete event; - } - } -} - -void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri, - std::string sourceSystemFlowFileIdentifier, std::string detail, - uint64_t processingDuration) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, - flow); - - if (event) { - event->setTransitUri(transitUri); - event->setDetails(detail); - event->setEventDuration(processingDuration); - event->setSourceSystemFlowFileIdentifier( - sourceSystemFlowFileIdentifier); - add(event); - } -} - -void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, - std::string detail, uint64_t processingDuration) { - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow); - - if (event) { - event->setTransitUri(transitUri); - event->setDetails(detail); - event->setEventDuration(processingDuration); - add(event); - } -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp deleted file mode 100644 index d7cc83a..0000000 --- a/libminifi/src/PutFile.cpp +++ /dev/null @@ -1,203 +0,0 @@ -/** - * @file PutFile.cpp - * PutFile class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <sstream> -#include <stdio.h> -#include <string> -#include <iostream> -#include <fstream> -#include <uuid/uuid.h> - -#include "utils/StringUtils.h" -#include "utils/TimeUtil.h" -#include "PutFile.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace"); -const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore"); -const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail"); - -const std::string PutFile::ProcessorName("PutFile"); - -Property PutFile::Directory("Output Directory", "The output directory to which to put files", "."); -Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL); - -Relationship PutFile::Success("success", "All files are routed to success"); -Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure"); - -void PutFile::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(Directory); - properties.insert(ConflictResolution); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - relationships.insert(Failure); - setSupportedRelationships(relationships); -} - -void PutFile::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string directory; - - if (!context->getProperty(Directory.getName(), directory)) - { - logger_->log_error("Directory attribute is missing or invalid"); - return; - } - - std::string conflictResolution; - - if (!context->getProperty(ConflictResolution.getName(), conflictResolution)) - { - logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid"); - return; - } - - FlowFileRecord *flowFile = session->get(); - - // Do nothing if there are no incoming files - if (!flowFile) - { - return; - } - - std::string filename; - flowFile->getAttribute(FILENAME, filename); - - // Generate a safe (universally-unique) temporary filename on the same partition - char tmpFileUuidStr[37]; - uuid_t tmpFileUuid; - uuid_generate(tmpFileUuid); - uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr); - std::stringstream tmpFileSs; - tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr; - std::string tmpFile = tmpFileSs.str(); - logger_->log_info("PutFile using temporary file %s", tmpFile.c_str()); - - // Determine dest full file paths - std::stringstream destFileSs; - destFileSs << directory << "/" << filename; - std::string destFile = destFileSs.str(); - - logger_->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory.c_str()); - - // If file exists, apply conflict resolution strategy - struct stat statResult; - - if (stat(destFile.c_str(), &statResult) == 0) - { - logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflictResolution.c_str()); - - if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE) - { - putFile(session, flowFile, tmpFile, destFile); - } - else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE) - { - session->transfer(flowFile, Success); - } - else - { - session->transfer(flowFile, Failure); - } - } - else - { - putFile(session, flowFile, tmpFile, destFile); - } -} - -bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const std::string &tmpFile, const std::string &destFile) -{ - - ReadCallback cb(tmpFile, destFile); - session->read(flowFile, &cb); - - if (cb.commit()) - { - session->transfer(flowFile, Success); - return true; - } - else - { - session->transfer(flowFile, Failure); - } - return false; -} - -PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile) -: _tmpFile(tmpFile) -, _tmpFileOs(tmpFile) -, _destFile(destFile) -{ - logger_ = Logger::getLogger(); -} - -// Copy the entire file contents to the temporary file -void PutFile::ReadCallback::process(std::ifstream *stream) -{ - // Copy file contents into tmp file - _writeSucceeded = false; - _tmpFileOs << stream->rdbuf(); - _writeSucceeded = true; -} - -// Renames tmp file to final destination -// Returns true if commit succeeded -bool PutFile::ReadCallback::commit() -{ - bool success = false; - - logger_->log_info("PutFile committing put file operation to %s", _destFile.c_str()); - - if (_writeSucceeded) - { - _tmpFileOs.close(); - - if (rename(_tmpFile.c_str(), _destFile.c_str())) - { - logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str()); - } - else - { - success = true; - logger_->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str()); - } - } - else - { - logger_->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str()); - } - - return success; -} - -// Clean up resources -PutFile::ReadCallback::~ReadCallback() { - // Close tmp file - _tmpFileOs.close(); - - // Clean up tmp file, if necessary - unlink(_tmpFile.c_str()); -}
