http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenSyslog.cpp b/libminifi/src/ListenSyslog.cpp
deleted file mode 100644
index f2901e0..0000000
--- a/libminifi/src/ListenSyslog.cpp
+++ /dev/null
@@ -1,343 +0,0 @@
-/**
- * @file ListenSyslog.cpp
- * ListenSyslog class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <queue>
-#include <stdio.h>
-#include <string>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "ListenSyslog.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string ListenSyslog::ProcessorName("ListenSyslog");
-Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each 
buffer used to receive Syslog messages.", "65507 B");
-Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The 
maximum size of the socket buffer that should be used.", "1 MB");
-Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The 
maximum number of concurrent connections to accept Syslog messages in TCP 
mode.", "2");
-Property ListenSyslog::MaxBatchSize("Max Batch Size",
-               "The maximum number of Syslog events to add to a single 
FlowFile.", "1");
-Property ListenSyslog::MessageDelimiter("Message Delimiter",
-               "Specifies the delimiter to place between Syslog messages when 
multiple messages are bundled together (see <Max Batch Size> property).", "\n");
-Property ListenSyslog::ParseMessages("Parse Messages",
-               "Indicates if the processor should parse the Syslog messages. 
If set to false, each outgoing FlowFile will only.", "false");
-Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog 
communication.", "UDP");
-Property ListenSyslog::Port("Port", "The port for Syslog communication.", 
"514");
-Relationship ListenSyslog::Success("success", "All files are routed to 
success");
-Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
-
-void ListenSyslog::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(RecvBufSize);
-       properties.insert(MaxSocketBufSize);
-       properties.insert(MaxConnections);
-       properties.insert(MaxBatchSize);
-       properties.insert(MessageDelimiter);
-       properties.insert(ParseMessages);
-       properties.insert(Protocol);
-       properties.insert(Port);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       relationships.insert(Invalid);
-       setSupportedRelationships(relationships);
-}
-
-void ListenSyslog::startSocketThread()
-{
-       if (_thread != NULL)
-               return;
-
-       logger_->log_info("ListenSysLog Socket Thread Start");
-       _serverTheadRunning = true;
-       _thread = new std::thread(run, this);
-       _thread->detach();
-}
-
-void ListenSyslog::run(ListenSyslog *process)
-{
-       process->runThread();
-}
-
-void ListenSyslog::runThread()
-{
-       while (_serverTheadRunning)
-       {
-               if (_resetServerSocket)
-               {
-                       _resetServerSocket = false;
-                       // need to reset the socket
-                       std::vector<int>::iterator it;
-                       for (it = _clientSockets.begin(); it != 
_clientSockets.end(); ++it)
-                       {
-                               int clientSocket = *it;
-                               close(clientSocket);
-                       }
-                       _clientSockets.clear();
-                       if (_serverSocket > 0)
-                       {
-                               close(_serverSocket);
-                               _serverSocket = 0;
-                       }
-               }
-
-               if (_serverSocket <= 0)
-               {
-                       uint16_t portno = _port;
-                       struct sockaddr_in serv_addr;
-                       int sockfd;
-                       if (_protocol == "TCP")
-                               sockfd = socket(AF_INET, SOCK_STREAM, 0);
-                       else
-                               sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-                       if (sockfd < 0)
-                       {
-                               logger_->log_info("ListenSysLog Server socket 
creation failed");
-                               break;
-                       }
-                       bzero((char *) &serv_addr, sizeof(serv_addr));
-                       serv_addr.sin_family = AF_INET;
-                       serv_addr.sin_addr.s_addr = INADDR_ANY;
-                       serv_addr.sin_port = htons(portno);
-                       if (bind(sockfd, (struct sockaddr *) &serv_addr,
-                                       sizeof(serv_addr)) < 0)
-                       {
-                               logger_->log_error("ListenSysLog Server socket 
bind failed");
-                               break;
-                       }
-                       if (_protocol == "TCP")
-                               listen(sockfd,5);
-                       _serverSocket = sockfd;
-                       logger_->log_error("ListenSysLog Server socket %d bind 
OK to port %d", _serverSocket, portno);
-               }
-               FD_ZERO(&_readfds);
-               FD_SET(_serverSocket, &_readfds);
-               _maxFds = _serverSocket;
-               std::vector<int>::iterator it;
-               for (it = _clientSockets.begin(); it != _clientSockets.end(); 
++it)
-               {
-                       int clientSocket = *it;
-                       if (clientSocket >= _maxFds)
-                               _maxFds = clientSocket;
-                       FD_SET(clientSocket, &_readfds);
-               }
-               fd_set fds;
-               struct timeval tv;
-               int retval;
-               fds = _readfds;
-               tv.tv_sec = 0;
-               // 100 msec
-               tv.tv_usec = 100000;
-               retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
-               if (retval < 0)
-                       break;
-               if (retval == 0)
-                       continue;
-               if (FD_ISSET(_serverSocket, &fds))
-               {
-                       // server socket, either we have UDP datagram or TCP 
connection request
-                       if (_protocol == "TCP")
-                       {
-                               socklen_t clilen;
-                               struct sockaddr_in cli_addr;
-                               clilen = sizeof(cli_addr);
-                               int newsockfd = accept(_serverSocket,
-                                               (struct sockaddr *) &cli_addr,
-                                               &clilen);
-                               if (newsockfd > 0)
-                               {
-                                       if (_clientSockets.size() < 
_maxConnections)
-                                       {
-                                               
_clientSockets.push_back(newsockfd);
-                                               logger_->log_info("ListenSysLog 
new client socket %d connection", newsockfd);
-                                               continue;
-                                       }
-                                       else
-                                       {
-                                               close(newsockfd);
-                                       }
-                               }
-                       }
-                       else
-                       {
-                               socklen_t clilen;
-                               struct sockaddr_in cli_addr;
-                               clilen = sizeof(cli_addr);
-                               int recvlen = recvfrom(_serverSocket, _buffer, 
sizeof(_buffer), 0,
-                                               (struct sockaddr *)&cli_addr, 
&clilen);
-                               if (recvlen > 0 && (recvlen + 
getEventQueueByteSize()) <= _recvBufSize)
-                               {
-                                       uint8_t *payload = new uint8_t[recvlen];
-                                       memcpy(payload, _buffer, recvlen);
-                                       putEvent(payload, recvlen);
-                               }
-                       }
-               }
-               it = _clientSockets.begin();
-               while (it != _clientSockets.end())
-               {
-                       int clientSocket = *it;
-                       if (FD_ISSET(clientSocket, &fds))
-                       {
-                               int recvlen = readline(clientSocket, (char 
*)_buffer, sizeof(_buffer));
-                               if (recvlen <= 0)
-                               {
-                                       close(clientSocket);
-                                       logger_->log_info("ListenSysLog client 
socket %d close", clientSocket);
-                                       it = _clientSockets.erase(it);
-                               }
-                               else
-                               {
-                                       if ((recvlen + getEventQueueByteSize()) 
<= _recvBufSize)
-                                       {
-                                               uint8_t *payload = new 
uint8_t[recvlen];
-                                               memcpy(payload, _buffer, 
recvlen);
-                                               putEvent(payload, recvlen);
-                                       }
-                                       ++it;
-                               }
-                       }
-               }
-       }
-       return;
-}
-
-
-int ListenSyslog::readline( int fd, char *bufptr, size_t len )
-{
-       char *bufx = bufptr;
-       static char *bp;
-       static int cnt = 0;
-       static char b[ 2048 ];
-       char c;
-
-       while ( --len > 0 )
-    {
-      if ( --cnt <= 0 )
-      {
-         cnt = recv( fd, b, sizeof( b ), 0 );
-         if ( cnt < 0 )
-         {
-                 if ( errno == EINTR )
-                 {
-                         len++;                /* the while will decrement */
-                         continue;
-                 }
-                 return -1;
-         }
-         if ( cnt == 0 )
-                 return 0;
-         bp = b;
-      }
-      c = *bp++;
-      *bufptr++ = c;
-      if ( c == '\n' )
-      {
-         *bufptr = '\n';
-         return bufptr - bufx + 1;
-      }
-    }
-       return -1;
-}
-
-void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string value;
-       bool needResetServerSocket = false;
-       if (context->getProperty(Protocol.getName(), value))
-       {
-               if (_protocol != value)
-                       needResetServerSocket = true;
-               _protocol = value;
-       }
-       if (context->getProperty(RecvBufSize.getName(), value))
-       {
-               Property::StringToInt(value, _recvBufSize);
-       }
-       if (context->getProperty(MaxSocketBufSize.getName(), value))
-       {
-               Property::StringToInt(value, _maxSocketBufSize);
-       }
-       if (context->getProperty(MaxConnections.getName(), value))
-       {
-               Property::StringToInt(value, _maxConnections);
-       }
-       if (context->getProperty(MessageDelimiter.getName(), value))
-       {
-               _messageDelimiter = value;
-       }
-       if (context->getProperty(ParseMessages.getName(), value))
-       {
-               StringUtils::StringToBool(value, _parseMessages);
-       }
-       if (context->getProperty(Port.getName(), value))
-       {
-               int64_t oldPort = _port;
-               Property::StringToInt(value, _port);
-               if (_port != oldPort)
-                       needResetServerSocket = true;
-       }
-       if (context->getProperty(MaxBatchSize.getName(), value))
-       {
-               Property::StringToInt(value, _maxBatchSize);
-       }
-
-       if (needResetServerSocket)
-               _resetServerSocket = true;
-
-       startSocketThread();
-
-       // read from the event queue
-       if (isEventQueueEmpty())
-       {
-               context->yield();
-               return;
-       }
-
-       std::queue<SysLogEvent> eventQueue;
-       pollEvent(eventQueue, _maxBatchSize);
-       bool firstEvent = true;
-       FlowFileRecord *flowFile = NULL;
-       while(!eventQueue.empty())
-       {
-               SysLogEvent event = eventQueue.front();
-               eventQueue.pop();
-               if (firstEvent)
-               {
-                       flowFile = session->create();
-                       if (!flowFile)
-                               return;
-                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
-                       session->write(flowFile, &callback);
-                       delete[] event.payload;
-                       firstEvent = false;
-               }
-               else
-               {
-                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
-                       session->append(flowFile, &callback);
-                       delete[] event.payload;
-               }
-       }
-       flowFile->addAttribute("syslog.protocol", _protocol);
-       flowFile->addAttribute("syslog.port", std::to_string(_port));
-       session->transfer(flowFile, Success);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/LogAppenders.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/LogAppenders.cpp b/libminifi/src/LogAppenders.cpp
deleted file mode 100644
index c90588d..0000000
--- a/libminifi/src/LogAppenders.cpp
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "../include/LogAppenders.h"
-
-const char 
*OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr";
-
-const char *RollingAppender::nifi_log_rolling_apender_file = 
"nifi.log.rolling.appender.file";
-const char *RollingAppender::nifi_log_rolling_appender_max_files = 
"nifi.log.rolling.appender.max.files";
-const char *RollingAppender::nifi_log_rolling_appender_max_file_size = 
"nifi.log.rolling.appender.max.file_size";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/LogAttribute.cpp b/libminifi/src/LogAttribute.cpp
deleted file mode 100644
index 345eb69..0000000
--- a/libminifi/src/LogAttribute.cpp
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * @file LogAttribute.cpp
- * LogAttribute class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <sstream>
-#include <string.h>
-#include <iostream>
-
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "LogAttribute.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string LogAttribute::ProcessorName("LogAttribute");
-Property LogAttribute::LogLevel("Log Level", "The Log Level to use when 
logging the Attributes", "info");
-Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated 
list of Attributes to Log. If not specified, all attributes will be logged.", 
"");
-Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A 
comma-separated list of Attributes to ignore. If not specified, no attributes 
will be ignored.", "");
-Property LogAttribute::LogPayload("Log Payload",
-               "If true, the FlowFile's payload will be logged, in addition to 
its attributes; otherwise, just the Attributes will be logged.", "false");
-Property LogAttribute::LogPrefix("Log prefix",
-               "Log prefix appended to the log lines. It helps to distinguish 
the output of multiple LogAttribute processors.", "");
-Relationship LogAttribute::Success("success", "success operational on the flow 
record");
-
-void LogAttribute::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(LogLevel);
-       properties.insert(AttributesToLog);
-       properties.insert(AttributesToIgnore);
-       properties.insert(LogPayload);
-       properties.insert(LogPrefix);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string dashLine = 
"--------------------------------------------------";
-       LogAttrLevel level = LogAttrLevelInfo;
-       bool logPayload = false;
-       std::ostringstream message;
-
-       FlowFileRecord *flow = session->get();
-
-       if (!flow)
-               return;
-
-       std::string value;
-       if (context->getProperty(LogLevel.getName(), value))
-       {
-               logLevelStringToEnum(value, level);
-       }
-       if (context->getProperty(LogPrefix.getName(), value))
-       {
-               dashLine = "-----" + value + "-----";
-       }
-       if (context->getProperty(LogPayload.getName(), value))
-       {
-               StringUtils::StringToBool(value, logPayload);
-       }
-
-       message << "Logging for flow file " << "\n";
-       message << dashLine;
-       message << "\nStandard FlowFile Attributes";
-       message << "\n" << "UUID:" << flow->getUUIDStr();
-       message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
-       message << "\n" << "lineageStartDate:" << 
getTimeStr(flow->getlineageStartDate());
-       message << "\n" << "Size:" << flow->getSize() << " Offset:" << 
flow->getOffset();
-       message << "\nFlowFile Attributes Map Content";
-       std::map<std::string, std::string> attrs = flow->getAttributes();
-    std::map<std::string, std::string>::iterator it;
-    for (it = attrs.begin(); it!= attrs.end(); it++)
-    {
-       message << "\n" << "key:" << it->first << " value:" << it->second;
-    }
-    message << "\nFlowFile Resource Claim Content";
-    ResourceClaim *claim = flow->getResourceClaim();
-    if (claim)
-    {
-       message << "\n" << "Content Claim:" << claim->getContentFullPath();
-    }
-    if (logPayload && flow->getSize() <= 1024*1024)
-    {
-       message << "\n" << "Payload:" << "\n";
-       ReadCallback callback(flow->getSize());
-       session->read(flow, &callback);
-       for (unsigned int i = 0, j = 0; i < callback._readSize; i++)
-       {
-               char temp[8];
-               sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
-               message << temp;
-               j++;
-               if (j == 16)
-               {
-                       message << '\n';
-                       j = 0;
-               }
-       }
-    }
-    message << "\n" << dashLine << std::ends;
-    std::string output = message.str();
-
-    switch (level)
-    {
-    case LogAttrLevelInfo:
-       logger_->log_info("%s", output.c_str());
-               break;
-    case LogAttrLevelDebug:
-       logger_->log_debug("%s", output.c_str());
-               break;
-    case LogAttrLevelError:
-       logger_->log_error("%s", output.c_str());
-               break;
-    case LogAttrLevelTrace:
-       logger_->log_trace("%s", output.c_str());
-       break;
-    case LogAttrLevelWarn:
-       logger_->log_warn("%s", output.c_str());
-       break;
-    default:
-       break;
-    }
-
-    // Test Import
-    /*
-    FlowFileRecord *importRecord = session->create();
-    session->import(claim->getContentFullPath(), importRecord);
-    session->transfer(importRecord, Success); */
-
-
-    // Transfer to the relationship
-    session->transfer(flow, Success);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Logger.cpp b/libminifi/src/Logger.cpp
deleted file mode 100644
index e90667d..0000000
--- a/libminifi/src/Logger.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * @file Logger.cpp
- * Logger class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "../include/Logger.h"
-
-#include <vector>
-#include <queue>
-#include <map>
-
-
-std::shared_ptr<Logger> Logger::singleton_logger_(nullptr);
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
deleted file mode 100644
index 7e3527e..0000000
--- a/libminifi/src/ProcessGroup.cpp
+++ /dev/null
@@ -1,307 +0,0 @@
-/**
- * @file ProcessGroup.cpp
- * ProcessGroup class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-
-#include "ProcessGroup.h"
-#include "Processor.h"
-
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t 
uuid,
-               ProcessGroup *parent) :
-               name_(name), type_(type), parent_process_group_(parent) {
-       if (!uuid)
-               // Generate the global UUID for the flow record
-               uuid_generate(uuid_);
-       else
-               uuid_copy(uuid_, uuid);
-
-       yield_period_msec_ = 0;
-       transmitting_ = false;
-
-       logger_ = Logger::getLogger();
-       logger_->log_info("ProcessGroup %s created", name_.c_str());
-}
-
-ProcessGroup::~ProcessGroup() {
-       for (std::set<Connection *>::iterator it = connections_.begin();
-                       it != connections_.end(); ++it) {
-               Connection *connection = *it;
-               connection->drain();
-               delete connection;
-       }
-
-       for (std::set<ProcessGroup *>::iterator it = 
child_process_groups_.begin();
-                       it != child_process_groups_.end(); ++it) {
-               ProcessGroup *processGroup(*it);
-               delete processGroup;
-       }
-
-       for (std::set<Processor *>::iterator it = processors_.begin();
-                       it != processors_.end(); ++it) {
-               Processor *processor(*it);
-               delete processor;
-       }
-}
-
-void ProcessGroup::getConnections(std::map<std::string, Connection*> 
*connectionMap)
-{
-       for (auto connection : connections_)
-       {
-               (*connectionMap)[connection->getUUIDStr()] = connection;
-       }
-
-       for (auto processGroup: child_process_groups_) {
-               processGroup->getConnections(connectionMap);
-       }
-}
-
-bool ProcessGroup::isRootProcessGroup() {
-       std::lock_guard<std::mutex> lock(mtx_);
-       return (type_ == ROOT_PROCESS_GROUP);
-}
-
-void ProcessGroup::addProcessor(Processor *processor) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       if (processors_.find(processor) == processors_.end()) {
-               // We do not have the same processor in this process group yet
-               processors_.insert(processor);
-               logger_->log_info("Add processor %s into process group %s",
-                               processor->getName().c_str(), name_.c_str());
-       }
-}
-
-void ProcessGroup::removeProcessor(Processor *processor) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       if (processors_.find(processor) != processors_.end()) {
-               // We do have the same processor in this process group yet
-               processors_.erase(processor);
-               logger_->log_info("Remove processor %s from process group %s",
-                               processor->getName().c_str(), name_.c_str());
-       }
-}
-
-void ProcessGroup::addProcessGroup(ProcessGroup *child) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       if (child_process_groups_.find(child) == child_process_groups_.end()) {
-               // We do not have the same child process group in this process 
group yet
-               child_process_groups_.insert(child);
-               logger_->log_info("Add child process group %s into process 
group %s",
-                               child->getName().c_str(), name_.c_str());
-       }
-}
-
-void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       if (child_process_groups_.find(child) != child_process_groups_.end()) {
-               // We do have the same child process group in this process 
group yet
-               child_process_groups_.erase(child);
-               logger_->log_info("Remove child process group %s from process 
group %s",
-                               child->getName().c_str(), name_.c_str());
-       }
-}
-
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
-               EventDrivenSchedulingAgent *eventScheduler) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       try {
-               // Start all the processor node, input and output ports
-               for (auto processor : processors_) {
-                       logger_->log_debug("Starting %s", 
processor->getName().c_str());
-
-                       if (!processor->isRunning()
-                                       && processor->getScheduledState() != 
DISABLED) {
-                               if (processor->getSchedulingStrategy() == 
TIMER_DRIVEN)
-                                       timeScheduler->schedule(processor);
-                               else if (processor->getSchedulingStrategy() == 
EVENT_DRIVEN)
-                                       eventScheduler->schedule(processor);
-                       }
-               }
-               // Start processing the group
-               for (auto processGroup : child_process_groups_) {
-                       processGroup->startProcessing(timeScheduler, 
eventScheduler);
-               }
-       } catch (std::exception &exception) {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       } catch (...) {
-               logger_->log_debug(
-                               "Caught Exception during process group start 
processing");
-               throw;
-       }
-}
-
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
-               EventDrivenSchedulingAgent *eventScheduler) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       try {
-               // Stop all the processor node, input and output ports
-               for (std::set<Processor *>::iterator it = processors_.begin();
-                               it != processors_.end(); ++it) {
-                       Processor *processor(*it);
-                       if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
-                               timeScheduler->unschedule(processor);
-                       else if (processor->getSchedulingStrategy() == 
EVENT_DRIVEN)
-                               eventScheduler->unschedule(processor);
-               }
-
-               for (std::set<ProcessGroup *>::iterator it =
-                               child_process_groups_.begin(); it != 
child_process_groups_.end();
-                               ++it) {
-                       ProcessGroup *processGroup(*it);
-                       processGroup->stopProcessing(timeScheduler, 
eventScheduler);
-               }
-       } catch (std::exception &exception) {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       } catch (...) {
-               logger_->log_debug(
-                               "Caught Exception during process group stop 
processing");
-               throw;
-       }
-}
-
-Processor *ProcessGroup::findProcessor(uuid_t uuid) {
-
-       Processor *ret = NULL;
-       // std::lock_guard<std::mutex> lock(_mtx);
-
-       for(auto processor : processors_){
-               logger_->log_info("find processor %s", 
processor->getName().c_str());
-               uuid_t processorUUID;
-
-               if (processor->getUUID(processorUUID)) {
-
-                       char uuid_str[37]; // ex. 
"1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
-                       uuid_unparse_lower(processorUUID, uuid_str);
-                       std::string processorUUIDstr = uuid_str;
-                       uuid_unparse_lower(uuid, uuid_str);
-                       std::string uuidStr = uuid_str;
-                       if (processorUUIDstr == uuidStr) {
-                               return processor;
-                       }
-               }
-
-       }
-       for(auto processGroup : child_process_groups_){
-
-               logger_->log_info("find processor child %s",
-                               processGroup->getName().c_str());
-               Processor *processor = processGroup->findProcessor(uuid);
-               if (processor)
-                       return processor;
-       }
-
-       return ret;
-}
-
-Processor *ProcessGroup::findProcessor(std::string processorName) {
-       Processor *ret = NULL;
-
-       for(auto processor : processors_){
-               logger_->log_debug("Current processor is %s",
-                               processor->getName().c_str());
-               if (processor->getName() == processorName)
-                       return processor;
-       }
-
-       for(auto processGroup : child_process_groups_){
-               Processor *processor = 
processGroup->findProcessor(processorName);
-               if (processor)
-                       return processor;
-       }
-
-       return ret;
-}
-
-void ProcessGroup::updatePropertyValue(std::string processorName,
-               std::string propertyName, std::string propertyValue) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       for(auto processor : processors_){
-               if (processor->getName() == processorName) {
-                       processor->setProperty(propertyName, propertyValue);
-               }
-       }
-
-       for(auto processGroup : child_process_groups_){
-               processGroup->updatePropertyValue(processorName, propertyName,
-                               propertyValue);
-       }
-
-       return;
-}
-
-void ProcessGroup::addConnection(Connection *connection) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       if (connections_.find(connection) == connections_.end()) {
-               // We do not have the same connection in this process group yet
-               connections_.insert(connection);
-               logger_->log_info("Add connection %s into process group %s",
-                               connection->getName().c_str(), name_.c_str());
-               uuid_t sourceUUID;
-               Processor *source = NULL;
-               connection->getSourceProcessorUUID(sourceUUID);
-               source = this->findProcessor(sourceUUID);
-               if (source)
-                       source->addConnection(connection);
-               Processor *destination = NULL;
-               uuid_t destinationUUID;
-               connection->getDestinationProcessorUUID(destinationUUID);
-               destination = this->findProcessor(destinationUUID);
-               if (destination && destination != source)
-                       destination->addConnection(connection);
-       }
-}
-
-void ProcessGroup::removeConnection(Connection *connection) {
-       std::lock_guard<std::mutex> lock(mtx_);
-
-       if (connections_.find(connection) != connections_.end()) {
-               // We do not have the same connection in this process group yet
-               connections_.erase(connection);
-               logger_->log_info("Remove connection %s into process group %s",
-                               connection->getName().c_str(), name_.c_str());
-               uuid_t sourceUUID;
-               Processor *source = NULL;
-               connection->getSourceProcessorUUID(sourceUUID);
-               source = this->findProcessor(sourceUUID);
-               if (source)
-                       source->removeConnection(connection);
-               Processor *destination = NULL;
-               uuid_t destinationUUID;
-               connection->getDestinationProcessorUUID(destinationUUID);
-               destination = this->findProcessor(destinationUUID);
-               if (destination && destination != source)
-                       destination->removeConnection(connection);
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
deleted file mode 100644
index 3b3eb64..0000000
--- a/libminifi/src/ProcessSession.cpp
+++ /dev/null
@@ -1,790 +0,0 @@
-/**
- * @file ProcessSession.cpp
- * ProcessSession class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <iostream>
-
-#include "ProcessSession.h"
-#include "FlowController.h"
-
-ProcessSession::ProcessSession(ProcessContext *processContext) : 
_processContext(processContext) {
-       logger_ = Logger::getLogger();
-       logger_->log_trace("ProcessSession created for %s", 
_processContext->getProcessor()->getName().c_str());
-       _provenanceReport = NULL;
-       if 
(FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
-       {
-               _provenanceReport = new 
ProvenanceReporter(_processContext->getProcessor()->getUUIDStr(),
-                                       
_processContext->getProcessor()->getName());
-       }
-}
-
-FlowFileRecord* ProcessSession::create()
-{
-       std::map<std::string, std::string> empty;
-       FlowFileRecord *record = new FlowFileRecord(empty);
-
-       if (record)
-       {
-               _addedFlowFiles[record->getUUIDStr()] = record;
-               logger_->log_debug("Create FlowFile with UUID %s", 
record->getUUIDStr().c_str());
-               std::string details = 
_processContext->getProcessor()->getName() + " creates flow record " +  
record->getUUIDStr();
-               if (_provenanceReport)
-                       _provenanceReport->create(record, details);
-       }
-
-       return record;
-}
-
-FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
-{
-       std::map<std::string, std::string> empty;
-       FlowFileRecord *record = new FlowFileRecord(empty);
-
-       if (record)
-       {
-               _addedFlowFiles[record->getUUIDStr()] = record;
-               logger_->log_debug("Create FlowFile with UUID %s", 
record->getUUIDStr().c_str());
-       }
-
-       if (record)
-       {
-               // Copy attributes
-               std::map<std::string, std::string> parentAttributes = 
parent->getAttributes();
-           std::map<std::string, std::string>::iterator it;
-           for (it = parentAttributes.begin(); it!= parentAttributes.end(); 
it++)
-           {
-               if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
-                               it->first == FlowAttributeKey(DISCARD_REASON) ||
-                                       it->first == FlowAttributeKey(UUID))
-                       // Do not copy special attributes from parent
-                       continue;
-               record->setAttribute(it->first, it->second);
-           }
-           record->_lineageStartDate = parent->_lineageStartDate;
-           record->_lineageIdentifiers = parent->_lineageIdentifiers;
-           record->_lineageIdentifiers.insert(parent->_uuidStr);
-
-       }
-       return record;
-}
-
-FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
-{
-       FlowFileRecord *record = this->create(parent);
-       if (record)
-       {
-               // Copy Resource Claim
-               record->_claim = parent->_claim;
-               if (record->_claim)
-               {
-                       record->_offset = parent->_offset;
-                       record->_size = parent->_size;
-                       record->_claim->increaseFlowFileRecordOwnedCount();
-               }
-               if (_provenanceReport)
-                       _provenanceReport->clone(parent, record);
-       }
-       return record;
-}
-
-FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
-{
-       std::map<std::string, std::string> empty;
-       FlowFileRecord *record = new FlowFileRecord(empty);
-
-       if (record)
-       {
-               this->_clonedFlowFiles[record->getUUIDStr()] = record;
-               logger_->log_debug("Clone FlowFile with UUID %s during 
transfer", record->getUUIDStr().c_str());
-               // Copy attributes
-               std::map<std::string, std::string> parentAttributes = 
parent->getAttributes();
-               std::map<std::string, std::string>::iterator it;
-               for (it = parentAttributes.begin(); it!= 
parentAttributes.end(); it++)
-               {
-                       if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) 
||
-                               it->first == FlowAttributeKey(DISCARD_REASON) ||
-                                       it->first == FlowAttributeKey(UUID))
-                       // Do not copy special attributes from parent
-                       continue;
-               record->setAttribute(it->first, it->second);
-           }
-           record->_lineageStartDate = parent->_lineageStartDate;
-           record->_lineageIdentifiers = parent->_lineageIdentifiers;
-           record->_lineageIdentifiers.insert(parent->_uuidStr);
-
-           // Copy Resource Claim
-           record->_claim = parent->_claim;
-           if (record->_claim)
-           {
-               record->_offset = parent->_offset;
-               record->_size = parent->_size;
-               record->_claim->increaseFlowFileRecordOwnedCount();
-           }
-           if (_provenanceReport)
-               _provenanceReport->clone(parent, record);
-       }
-
-       return record;
-}
-
-FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, 
long size)
-{
-       FlowFileRecord *record = this->create(parent);
-       if (record)
-       {
-               if (parent->_claim)
-               {
-                       if ((offset + size) > (long) parent->_size)
-                       {
-                               // Set offset and size
-                               logger_->log_error("clone offset %d and size %d 
exceed parent size %d",
-                                               offset, size, parent->_size);
-                               // Remove the Add FlowFile for the session
-                               std::map<std::string, FlowFileRecord 
*>::iterator it =
-                                               
this->_addedFlowFiles.find(record->getUUIDStr());
-                               if (it != this->_addedFlowFiles.end())
-                                       
this->_addedFlowFiles.erase(record->getUUIDStr());
-                               delete record;
-                               return NULL;
-                       }
-                       record->_offset = parent->_offset + parent->_offset;
-                       record->_size = size;
-                       // Copy Resource Claim
-                       record->_claim = parent->_claim;
-                       record->_claim->increaseFlowFileRecordOwnedCount();
-               }
-               if (_provenanceReport)
-                       _provenanceReport->clone(parent, record);
-       }
-       return record;
-}
-
-void ProcessSession::remove(FlowFileRecord *flow)
-{
-       flow->_markedDelete = true;
-       _deletedFlowFiles[flow->getUUIDStr()] = flow;
-       std::string reason = _processContext->getProcessor()->getName() + " 
drop flow record " +  flow->getUUIDStr();
-       if (_provenanceReport)
-               _provenanceReport->drop(flow, reason);
-}
-
-void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, 
std::string value)
-{
-       flow->setAttribute(key, value);
-       std::string details = _processContext->getProcessor()->getName() + " 
modify flow record " +  flow->getUUIDStr() +
-                       " attribute " + key + ":" + value;
-       if (_provenanceReport)
-               _provenanceReport->modifyAttributes(flow, details);
-}
-
-void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
-{
-       flow->removeAttribute(key);
-       std::string details = _processContext->getProcessor()->getName() + " 
remove flow record " +  flow->getUUIDStr() +
-                               " attribute " + key;
-       if (_provenanceReport)
-               _provenanceReport->modifyAttributes(flow, details);
-}
-
-void ProcessSession::penalize(FlowFileRecord *flow)
-{
-       flow->_penaltyExpirationMs = getTimeMillis() + 
this->_processContext->getProcessor()->getPenalizationPeriodMsec();
-}
-
-void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
-{
-       _transferRelationship[flow->getUUIDStr()] = relationship;
-}
-
-void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback 
*callback)
-{
-       ResourceClaim *claim = NULL;
-
-       claim = new ResourceClaim();
-
-       try
-       {
-               std::ofstream fs;
-               uint64_t startTime = getTimeMillis();
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
-               if (fs.is_open())
-               {
-                       // Call the callback to write the content
-                       callback->process(&fs);
-                       if (fs.good() && fs.tellp() >= 0)
-                       {
-                               flow->_size = fs.tellp();
-                               flow->_offset = 0;
-                               if (flow->_claim)
-                               {
-                                       // Remove the old claim
-                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
-                                       flow->_claim = NULL;
-                               }
-                               flow->_claim = claim;
-                               claim->increaseFlowFileRecordOwnedCount();
-                               /*
-                               logger_->log_debug("Write offset %d length %d 
into content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-                               fs.close();
-                               std::string details = 
_processContext->getProcessor()->getName() + " modify flow record content " +  
flow->getUUIDStr();
-                               uint64_t endTime = getTimeMillis();
-                               if (_provenanceReport)
-                                       _provenanceReport->modifyContent(flow, 
details, endTime - startTime);
-                       }
-                       else
-                       {
-                               fs.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
-               }
-       }
-       catch (std::exception &exception)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               logger_->log_debug("Caught Exception during process session 
write");
-               throw;
-       }
-}
-
-void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback 
*callback)
-{
-       ResourceClaim *claim = NULL;
-
-       if (flow->_claim == NULL)
-       {
-               // No existed claim for append, we need to create new claim
-               return write(flow, callback);
-       }
-
-       claim = flow->_claim;
-
-       try
-       {
-               std::ofstream fs;
-               uint64_t startTime = getTimeMillis();
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::app);
-               if (fs.is_open())
-               {
-                       // Call the callback to write the content
-                       std::streampos oldPos = fs.tellp();
-                       callback->process(&fs);
-                       if (fs.good() && fs.tellp() >= 0)
-                       {
-                               uint64_t appendSize = fs.tellp() - oldPos;
-                               flow->_size += appendSize;
-                               /*
-                               logger_->log_debug("Append offset %d extra 
length %d to new size %d into content %s for FlowFile UUID %s",
-                                               flow->_offset, appendSize, 
flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); 
*/
-                               fs.close();
-                               std::string details = 
_processContext->getProcessor()->getName() + " modify flow record content " +  
flow->getUUIDStr();
-                               uint64_t endTime = getTimeMillis();
-                               if (_provenanceReport)
-                                       _provenanceReport->modifyContent(flow, 
details, endTime - startTime);
-                       }
-                       else
-                       {
-                               fs.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
-               }
-       }
-       catch (std::exception &exception)
-       {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               logger_->log_debug("Caught Exception during process session 
append");
-               throw;
-       }
-}
-
-void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
-{
-       try
-       {
-               ResourceClaim *claim = NULL;
-               if (flow->_claim == NULL)
-               {
-                       // No existed claim for read, we throw exception
-                       throw Exception(FILE_OPERATION_EXCEPTION, "No Content 
Claim existed for read");
-               }
-
-               claim = flow->_claim;
-               std::ifstream fs;
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::in | 
std::fstream::binary);
-               if (fs.is_open())
-               {
-                       fs.seekg(flow->_offset, fs.beg);
-
-                       if (fs.good())
-                       {
-                               callback->process(&fs);
-                               /*
-                               logger_->log_debug("Read offset %d size %d 
content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-                               fs.close();
-                       }
-                       else
-                       {
-                               fs.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Read Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
-               }
-       }
-       catch (std::exception &exception)
-       {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               logger_->log_debug("Caught Exception during process session 
read");
-               throw;
-       }
-}
-
-void ProcessSession::import(std::string source, FlowFileRecord *flow, bool 
keepSource, uint64_t offset)
-{
-       ResourceClaim *claim = NULL;
-
-       claim = new ResourceClaim();
-       char *buf = NULL;
-       int size = 4096;
-       buf = new char [size];
-
-       try
-       {
-               std::ofstream fs;
-               uint64_t startTime = getTimeMillis();
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
-               std::ifstream input;
-               input.open(source.c_str(), std::fstream::in | 
std::fstream::binary);
-
-               if (fs.is_open() && input.is_open())
-               {
-                       // Open the source file and stream to the flow file
-                       input.seekg(offset, fs.beg);
-                       while (input.good())
-                       {
-                               input.read(buf, size);
-                               if (input)
-                                       fs.write(buf, size);
-                               else
-                                       fs.write(buf, input.gcount());
-                       }
-
-                       if (fs.good() && fs.tellp() >= 0)
-                       {
-                               flow->_size = fs.tellp();
-                               flow->_offset = 0;
-                               if (flow->_claim)
-                               {
-                                       // Remove the old claim
-                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
-                                       flow->_claim = NULL;
-                               }
-                               flow->_claim = claim;
-                               claim->increaseFlowFileRecordOwnedCount();
-
-                               logger_->log_debug("Import offset %d length %d 
into content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
-
-                               fs.close();
-                               input.close();
-                               if (!keepSource)
-                                       std::remove(source.c_str());
-                               std::string details = 
_processContext->getProcessor()->getName() + " modify flow record content " +  
flow->getUUIDStr();
-                               uint64_t endTime = getTimeMillis();
-                               if (_provenanceReport)
-                                       _provenanceReport->modifyContent(flow, 
details, endTime - startTime);
-                       }
-                       else
-                       {
-                               fs.close();
-                               input.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Import Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Import 
Error");
-               }
-
-               delete[] buf;
-       }
-       catch (std::exception &exception)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               logger_->log_debug("Caught Exception %s", exception.what());
-               delete[] buf;
-               throw;
-       }
-       catch (...)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               logger_->log_debug("Caught Exception during process session 
write");
-               delete[] buf;
-               throw;
-       }
-}
-
-void ProcessSession::commit()
-{
-       try
-       {
-               // First we clone the flow record based on the transfered 
relationship for updated flow record
-               for (auto && it : _updatedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       if (record->_markedDelete)
-                               continue;
-                       std::map<std::string, Relationship>::iterator 
itRelationship =
-                                       
this->_transferRelationship.find(record->getUUIDStr());
-                       if (itRelationship != _transferRelationship.end())
-                       {
-                               Relationship relationship = 
itRelationship->second;
-                               // Find the relationship, we need to find the 
connections for that relationship
-                               std::set<Connection *> connections =
-                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
-                               if (connections.empty())
-                               {
-                                       // No connection
-                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
-                                       {
-                                               // Not autoterminate, we should 
have the connect
-                                               std::string message = "Connect 
empty for non auto terminated relationship" + relationship.getName();
-                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
-                                       }
-                                       else
-                                       {
-                                               // Autoterminated
-                                               remove(record);
-                                       }
-                               }
-                               else
-                               {
-                                       // We connections, clone the flow and 
assign the connection accordingly
-                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
-                                       {
-                                               Connection 
*connection(*itConnection);
-                                               if (itConnection == 
connections.begin())
-                                               {
-                                                       // First connection 
which the flow need be routed to
-                                                       record->_connection = 
connection;
-                                               }
-                                               else
-                                               {
-                                                       // Clone the flow file 
and route to the connection
-                                                       FlowFileRecord 
*cloneRecord;
-                                                       cloneRecord = 
this->cloneDuringTransfer(record);
-                                                       if (cloneRecord)
-                                                               
cloneRecord->_connection = connection;
-                                                       else
-                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
-                                               }
-                                       }
-                               }
-                       }
-                       else
-                       {
-                               // Can not find relationship for the flow
-                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
-                       }
-               }
-               // Do the samething for added flow file
-               for(const auto it : _addedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       if (record->_markedDelete)
-                               continue;
-                       std::map<std::string, Relationship>::iterator 
itRelationship =
-                                       
this->_transferRelationship.find(record->getUUIDStr());
-                       if (itRelationship != _transferRelationship.end())
-                       {
-                               Relationship relationship = 
itRelationship->second;
-                               // Find the relationship, we need to find the 
connections for that relationship
-                               std::set<Connection *> connections =
-                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
-                               if (connections.empty())
-                               {
-                                       // No connection
-                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
-                                       {
-                                               // Not autoterminate, we should 
have the connect
-                                               std::string message = "Connect 
empty for non auto terminated relationship " + relationship.getName();
-                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
-                                       }
-                                       else
-                                       {
-                                               // Autoterminated
-                                               remove(record);
-                                       }
-                               }
-                               else
-                               {
-                                       // We connections, clone the flow and 
assign the connection accordingly
-                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
-                                       {
-                                               Connection 
*connection(*itConnection);
-                                               if (itConnection == 
connections.begin())
-                                               {
-                                                       // First connection 
which the flow need be routed to
-                                                       record->_connection = 
connection;
-                                               }
-                                               else
-                                               {
-                                                       // Clone the flow file 
and route to the connection
-                                                       FlowFileRecord 
*cloneRecord;
-                                                       cloneRecord = 
this->cloneDuringTransfer(record);
-                                                       if (cloneRecord)
-                                                               
cloneRecord->_connection = connection;
-                                                       else
-                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
-                                               }
-                                       }
-                               }
-                       }
-                       else
-                       {
-                               // Can not find relationship for the flow
-                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
-                       }
-               }
-               // Complete process the added and update flow files for the 
session, send the flow file to its queue
-               for(const auto &it : _updatedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       if (record->_markedDelete)
-                       {
-                               continue;
-                       }
-                       if (record->_connection)
-                               record->_connection->put(record);
-                       else
-                               delete record;
-               }
-               for(const auto &it : _addedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       if (record->_markedDelete)
-                       {
-                               continue;
-                       }
-                       if (record->_connection)
-                               record->_connection->put(record);
-                       else
-                               delete record;
-               }
-               // Process the clone flow files
-               for(const auto &it : _clonedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       if (record->_markedDelete)
-                       {
-                               continue;
-                       }
-                       if (record->_connection)
-                               record->_connection->put(record);
-                       else
-                               delete record;
-               }
-               // Delete the deleted flow files
-               for(const auto &it : _deletedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       delete record;
-               }
-               // Delete the snapshot
-               for(const auto &it : _originalFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       delete record;
-               }
-               // All done
-               _updatedFlowFiles.clear();
-               _addedFlowFiles.clear();
-               _clonedFlowFiles.clear();
-               _deletedFlowFiles.clear();
-               _originalFlowFiles.clear();
-               // persistent the provenance report
-               if (this->_provenanceReport)
-                       this->_provenanceReport->commit();
-               logger_->log_trace("ProcessSession committed for %s", 
_processContext->getProcessor()->getName().c_str());
-       }
-       catch (std::exception &exception)
-       {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               logger_->log_debug("Caught Exception during process session 
commit");
-               throw;
-       }
-}
-
-
-void ProcessSession::rollback()
-{
-       try
-       {
-               // Requeue the snapshot of the flowfile back
-               for(const auto &it : _originalFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       if (record->_orginalConnection)
-                       {
-                               record->_snapshot = false;
-                               record->_orginalConnection->put(record);
-                       }
-                       else
-                               delete record;
-               }
-               _originalFlowFiles.clear();
-               // Process the clone flow files
-               for(const auto &it : _clonedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       delete record;
-               }
-               _clonedFlowFiles.clear();
-               for(const auto &it : _addedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       delete record;
-               }
-               _addedFlowFiles.clear();
-               for(const auto &it : _updatedFlowFiles)
-               {
-                       FlowFileRecord *record = it.second;
-                       delete record;
-               }
-               _updatedFlowFiles.clear();
-               _deletedFlowFiles.clear();
-               logger_->log_trace("ProcessSession rollback for %s", 
_processContext->getProcessor()->getName().c_str());
-       }
-       catch (std::exception &exception)
-       {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               logger_->log_debug("Caught Exception during process session 
roll back");
-               throw;
-       }
-}
-
-FlowFileRecord *ProcessSession::get()
-{
-       Connection *first = 
_processContext->getProcessor()->getNextIncomingConnection();
-
-       if (first == NULL)
-               return NULL;
-
-       Connection *current = first;
-
-       do
-       {
-               std::set<FlowFileRecord *> expired;
-               FlowFileRecord *ret = current->poll(expired);
-               if (expired.size() > 0)
-               {
-                       // Remove expired flow record
-                       for (std::set<FlowFileRecord *>::iterator it = 
expired.begin(); it != expired.end(); ++it)
-                       {
-                               FlowFileRecord *record = *it;
-                               std::string details = 
_processContext->getProcessor()->getName() + " expire flow record " +  
record->getUUIDStr();
-                               if (_provenanceReport)
-                                       _provenanceReport->expire(record, 
details);
-                               delete (record);
-                       }
-               }
-               if (ret)
-               {
-                       // add the flow record to the current process session 
update map
-                       ret->_markedDelete = false;
-                       _updatedFlowFiles[ret->getUUIDStr()] = ret;
-                       std::map<std::string, std::string> empty;
-                       FlowFileRecord *snapshot = new FlowFileRecord(empty);
-                       logger_->log_debug("Create Snapshot FlowFile with UUID 
%s", snapshot->getUUIDStr().c_str());
-                       snapshot->duplicate(ret);
-                       // save a snapshot
-                       _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
-                       return ret;
-               }
-               current = 
_processContext->getProcessor()->getNextIncomingConnection();
-       }
-       while (current != NULL && current != first);
-
-       return NULL;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ProcessSessionFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSessionFactory.cpp 
b/libminifi/src/ProcessSessionFactory.cpp
deleted file mode 100644
index a105b1c..0000000
--- a/libminifi/src/ProcessSessionFactory.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * @file ProcessSessionFactory.cpp
- * ProcessSessionFactory class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "ProcessSessionFactory.h"
-
-#include <memory>
-
-std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession()
-{
-       return std::unique_ptr<ProcessSession>(new 
ProcessSession(_processContext));
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
deleted file mode 100644
index 94aaa20..0000000
--- a/libminifi/src/Processor.cpp
+++ /dev/null
@@ -1,526 +0,0 @@
-/**
- * @file Processor.cpp
- * Processor class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <memory>
-#include <functional>
-
-#include "Processor.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include "ProcessSessionFactory.h"
-
-Processor::Processor(std::string name, uuid_t uuid)
-: _name(name)
-{
-       if (!uuid)
-               // Generate the global UUID for the flow record
-               uuid_generate(_uuid);
-       else
-               uuid_copy(_uuid, uuid);
-
-       char uuidStr[37];
-       uuid_unparse_lower(_uuid, uuidStr);
-       _uuidStr = uuidStr;
-       _hasWork.store(false);
-       // Setup the default values
-       _state = DISABLED;
-       _strategy = TIMER_DRIVEN;
-       _lossTolerant = false;
-       _triggerWhenEmpty = false;
-       _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
-       _runDurantionNano = 0;
-       _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
-       _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
-       _maxConcurrentTasks = 1;
-       _activeTasks = 0;
-       _yieldExpiration = 0;
-       _incomingConnectionsIter = this->_incomingConnections.begin();
-       logger_ = Logger::getLogger();
-       logger_->log_info("Processor %s created UUID %s", _name.c_str(), 
_uuidStr.c_str());
-}
-
-Processor::~Processor()
-{
-
-}
-
-bool Processor::isRunning()
-{
-       return (_state == RUNNING && _activeTasks > 0);
-}
-
-void Processor::setScheduledState(ScheduledState state)
-{
-       _state = state;
-}
-
-bool Processor::setSupportedProperties(std::set<Property> properties)
-{
-       if (isRunning())
-       {
-               logger_->log_info("Can not set processor property while the 
process %s is running",
-                               _name.c_str());
-               return false;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _properties.clear();
-       for (auto item : properties)
-       {
-               _properties[item.getName()] = item;
-               logger_->log_info("Processor %s supported property name %s", 
_name.c_str(), item.getName().c_str());
-       }
-
-       return true;
-}
-
-bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
-{
-       if (isRunning())
-       {
-               logger_->log_info("Can not set processor supported relationship 
while the process %s is running",
-                               _name.c_str());
-               return false;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _relationships.clear();
-       for(auto item : relationships)
-       {
-               _relationships[item.getName()] = item;
-               logger_->log_info("Processor %s supported relationship name 
%s", _name.c_str(), item.getName().c_str());
-       }
-
-       return true;
-}
-
-bool Processor::setAutoTerminatedRelationships(std::set<Relationship> 
relationships)
-{
-       if (isRunning())
-       {
-               logger_->log_info("Can not set processor auto terminated 
relationship while the process %s is running",
-                               _name.c_str());
-               return false;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _autoTerminatedRelationships.clear();
-       for(auto item : relationships)
-       {
-               _autoTerminatedRelationships[item.getName()] = item;
-               logger_->log_info("Processor %s auto terminated relationship 
name %s", _name.c_str(), item.getName().c_str());
-       }
-
-       return true;
-}
-
-bool Processor::isAutoTerminated(Relationship relationship)
-{
-       bool isRun = isRunning();
-               
-       auto conditionalLock = !isRun ? 
-                         std::unique_lock<std::mutex>() 
-                       : std::unique_lock<std::mutex>(_mtx);
-
-       const auto &it = 
_autoTerminatedRelationships.find(relationship.getName());
-       if (it != _autoTerminatedRelationships.end())
-       {
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool Processor::isSupportedRelationship(Relationship relationship)
-{
-       bool isRun = isRunning();
-
-       auto conditionalLock = !isRun ? 
-                         std::unique_lock<std::mutex>() 
-                       : std::unique_lock<std::mutex>(_mtx);
-
-       const auto &it = _relationships.find(relationship.getName());
-       if (it != _relationships.end())
-       {
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool Processor::getProperty(std::string name, std::string &value)
-{
-       bool isRun = isRunning();
-
-       
-        auto conditionalLock = !isRun ? 
-                           std::unique_lock<std::mutex>() 
-                         : std::unique_lock<std::mutex>(_mtx);
-                        
-       const auto &it = _properties.find(name);
-       if (it != _properties.end())
-       {
-               Property item = it->second;
-               value = item.getValue();
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool Processor::setProperty(std::string name, std::string value)
-{
-
-       std::lock_guard<std::mutex> lock(_mtx);
-       auto &&it = _properties.find(name);
-
-       if (it != _properties.end())
-       {
-               Property item = it->second;
-               item.setValue(value);
-               _properties[item.getName()] = item;
-               logger_->log_info("Processor %s property name %s value %s", 
_name.c_str(), item.getName().c_str(), value.c_str());
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool Processor::setProperty(Property prop, std::string value) {
-
-       std::lock_guard<std::mutex> lock(_mtx);
-       auto it = _properties.find(
-                       prop.getName());
-
-       if (it != _properties.end()) {
-               Property item = it->second;
-               item.setValue(value);
-               _properties[item.getName()] = item;
-               logger_->log_info("Processor %s property name %s value %s",
-                               _name.c_str(), item.getName().c_str(), 
value.c_str());
-               return true;
-       } else {
-               Property newProp(prop);
-               newProp.setValue(value);
-               _properties.insert(
-                               std::pair<std::string, 
Property>(prop.getName(), newProp));
-               return true;
-
-               return false;
-       }
-}
-
-std::set<Connection *> Processor::getOutGoingConnections(std::string 
relationship)
-{
-       std::set<Connection *> empty;
-
-       auto  &&it = _outGoingConnections.find(relationship);
-       if (it != _outGoingConnections.end())
-       {
-               return _outGoingConnections[relationship];
-       }
-       else
-       {
-               return empty;
-       }
-}
-
-bool Processor::addConnection(Connection *connection)
-{
-       bool ret = false;
-
-
-       if (isRunning())
-       {
-               logger_->log_info("Can not add connection while the process %s 
is running",
-                               _name.c_str());
-               return false;
-       }
-
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       uuid_t srcUUID;
-       uuid_t destUUID;
-
-       connection->getSourceProcessorUUID(srcUUID);
-       connection->getDestinationProcessorUUID(destUUID);
-       char uuid_str[37];
-
-
-       uuid_unparse_lower(_uuid, uuid_str);
-       std::string my_uuid = uuid_str;
-       uuid_unparse_lower(destUUID, uuid_str);
-       std::string destination_uuid = uuid_str;
-       if (my_uuid == destination_uuid)
-       {
-               // Connection is destination to the current processor
-               if (_incomingConnections.find(connection) == 
_incomingConnections.end())
-               {
-                       _incomingConnections.insert(connection);
-                       connection->setDestinationProcessor(this);
-                       logger_->log_info("Add connection %s into Processor %s 
incoming connection",
-                                       connection->getName().c_str(), 
_name.c_str());
-                       _incomingConnectionsIter = 
this->_incomingConnections.begin();
-                       ret = true;
-               }
-       }
-       uuid_unparse_lower(srcUUID, uuid_str);
-       std::string source_uuid = uuid_str;
-       if (my_uuid == source_uuid)
-       {
-               std::string relationship = 
connection->getRelationship().getName();
-               // Connection is source from the current processor
-               auto &&it =
-                               _outGoingConnections.find(relationship);
-               if (it != _outGoingConnections.end())
-               {
-                       // We already has connection for this relationship
-                       std::set<Connection *> existedConnection = it->second;
-                       if (existedConnection.find(connection) == 
existedConnection.end())
-                       {
-                               // We do not have the same connection for this 
relationship yet
-                               existedConnection.insert(connection);
-                               connection->setSourceProcessor(this);
-                               _outGoingConnections[relationship] = 
existedConnection;
-                               logger_->log_info("Add connection %s into 
Processor %s outgoing connection for relationship %s",
-                                                                               
                connection->getName().c_str(), _name.c_str(), 
relationship.c_str());
-                               ret = true;
-                       }
-               }
-               else
-               {
-
-                       // We do not have any outgoing connection for this 
relationship yet
-                       std::set<Connection *> newConnection;
-                       newConnection.insert(connection);
-                       connection->setSourceProcessor(this);
-                       _outGoingConnections[relationship] = newConnection;
-                       logger_->log_info("Add connection %s into Processor %s 
outgoing connection for relationship %s",
-                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
-                       ret = true;
-               }
-       }
-       
-
-       return ret;
-}
-
-void Processor::removeConnection(Connection *connection)
-{
-       if (isRunning())
-       {
-               logger_->log_info("Can not remove connection while the process 
%s is running",
-                               _name.c_str());
-               return;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       uuid_t srcUUID;
-       uuid_t destUUID;
-
-       connection->getSourceProcessorUUID(srcUUID);
-       connection->getDestinationProcessorUUID(destUUID);
-
-       if (uuid_compare(_uuid, destUUID) == 0)
-       {
-               // Connection is destination to the current processor
-               if (_incomingConnections.find(connection) != 
_incomingConnections.end())
-               {
-                       _incomingConnections.erase(connection);
-                       connection->setDestinationProcessor(NULL);
-                       logger_->log_info("Remove connection %s into Processor 
%s incoming connection",
-                                       connection->getName().c_str(), 
_name.c_str());
-                       _incomingConnectionsIter = 
this->_incomingConnections.begin();
-               }
-       }
-
-       if (uuid_compare(_uuid, srcUUID) == 0)
-       {
-               std::string relationship = 
connection->getRelationship().getName();
-               // Connection is source from the current processor
-               auto &&it =
-                               _outGoingConnections.find(relationship);
-               if (it == _outGoingConnections.end())
-               {
-                       return;
-               }
-               else
-               {
-                       if (_outGoingConnections[relationship].find(connection) 
!= _outGoingConnections[relationship].end())
-                       {
-                               
_outGoingConnections[relationship].erase(connection);
-                               connection->setSourceProcessor(NULL);
-                               logger_->log_info("Remove connection %s into 
Processor %s outgoing connection for relationship %s",
-                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
-                       }
-               }
-       }
-}
-
-Connection *Processor::getNextIncomingConnection()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_incomingConnections.size() == 0)
-               return NULL;
-
-       if (_incomingConnectionsIter == _incomingConnections.end())
-               _incomingConnectionsIter = _incomingConnections.begin();
-
-       Connection *ret = *_incomingConnectionsIter;
-       _incomingConnectionsIter++;
-
-       if (_incomingConnectionsIter == _incomingConnections.end())
-               _incomingConnectionsIter = _incomingConnections.begin();
-
-       return ret;
-}
-
-bool Processor::flowFilesQueued()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_incomingConnections.size() == 0)
-               return false;
-
-       for(auto &&connection : _incomingConnections)
-       {
-               if (connection->getQueueSize() > 0)
-                       return true;
-       }
-
-       return false;
-}
-
-bool Processor::flowFilesOutGoingFull()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-        for(auto &&connection : _outGoingConnections)
-       {
-               // We already has connection for this relationship
-               std::set<Connection *> existedConnection = connection.second;
-               for(const auto connection : existedConnection)
-               {
-                       if (connection->isFull())
-                               return true;
-               }
-       }
-
-       return false;
-}
-
-void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory 
*sessionFactory)
-{
-       auto session = sessionFactory->createSession();
-
-       try {
-               // Call the virtual trigger function
-               onTrigger(context, session.get());
-               session->commit();
-       }
-       catch (std::exception &exception)
-       {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               session->rollback();
-               throw;
-       }
-       catch (...)
-       {
-               logger_->log_debug("Caught Exception Processor::onTrigger");
-               session->rollback();
-               throw;
-       }
-}
-
-void Processor::waitForWork(uint64_t timeoutMs)
-{
-       _hasWork.store( isWorkAvailable() );
-
-       if (!_hasWork.load())
-       {
-           std::unique_lock<std::mutex> lock(_workAvailableMtx);
-           _hasWorkCondition.wait_for(lock, 
std::chrono::milliseconds(timeoutMs), [&] { return _hasWork.load(); });
-       }
-
-}
-
-void Processor::notifyWork()
-{
-       // Do nothing if we are not event-driven
-       if (_strategy != EVENT_DRIVEN)
-       {
-               return;
-       }
-
-       {
-               _hasWork.store( isWorkAvailable() );
-
-
-               if (_hasWork.load())
-               {
-                     _hasWorkCondition.notify_one();
-               }
-       }
-}
-
-bool Processor::isWorkAvailable()
-{
-       // We have work if any incoming connection has work
-       bool hasWork = false;
-
-       try
-       {
-               for (const auto &conn : getIncomingConnections())
-               {
-                       if (conn->getQueueSize() > 0)
-                       {
-                               hasWork = true;
-                               break;
-                       }
-               }
-       }
-       catch (...)
-       {
-               logger_->log_error("Caught an exception while checking if work 
is available; unless it was positively determined that work is available, 
assuming NO work is available!");
-       }
-
-       return hasWork;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp
deleted file mode 100644
index 58cf730..0000000
--- a/libminifi/src/Provenance.cpp
+++ /dev/null
@@ -1,566 +0,0 @@
-/**
- * @file Provenance.cpp
- * Provenance implemenatation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <cstdint>
-#include <vector>
-#include <arpa/inet.h>
-#include "io/DataStream.h"
-#include "io/Serializable.h"
-#include "Provenance.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-
-//! DeSerialize
-bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo,
-               std::string key) {
-       std::string value;
-       bool ret;
-
-       ret = repo->Get(key, value);
-
-       if (!ret) {
-               logger_->log_error("NiFi Provenance Store event %s can not 
found",
-                               key.c_str());
-               return false;
-       } else
-               logger_->log_debug("NiFi Provenance Read event %s length %d",
-                               key.c_str(), value.length());
-
-
-       DataStream stream((const uint8_t*)value.data(),value.length());
-
-       ret = DeSerialize(stream);
-
-       if (ret) {
-               logger_->log_debug(
-                               "NiFi Provenance retrieve event %s size %d 
eventType %d success",
-                               _eventIdStr.c_str(), stream.getSize(), 
_eventType);
-       } else {
-               logger_->log_debug(
-                               "NiFi Provenance retrieve event %s size %d 
eventType %d fail",
-                               _eventIdStr.c_str(), stream.getSize(), 
_eventType);
-       }
-
-       return ret;
-}
-
-bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) {
-
-       DataStream outStream;
-
-       int ret;
-
-       ret = writeUTF(this->_eventIdStr,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       uint32_t eventType = this->_eventType;
-       ret = write(eventType,&outStream);
-       if (ret != 4) {
-
-               return false;
-       }
-
-       ret = write(this->_eventTime,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = write(this->_entryDate,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = write(this->_eventDuration,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = write(this->_lineageStartDate,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = writeUTF(this->_componentId,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       ret = writeUTF(this->_componentType,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       ret = writeUTF(this->_uuid,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       ret = writeUTF(this->_details,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       // write flow attributes
-       uint32_t numAttributes = this->_attributes.size();
-       ret = write(numAttributes,&outStream);
-       if (ret != 4) {
-
-               return false;
-       }
-
-       for (auto itAttribute : _attributes) {
-               ret = writeUTF(itAttribute.first,&outStream, true);
-               if (ret <= 0) {
-
-                       return false;
-               }
-               ret = writeUTF(itAttribute.second,&outStream, true);
-               if (ret <= 0) {
-
-                       return false;
-               }
-       }
-
-       ret = writeUTF(this->_contentFullPath,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       ret = write(this->_size,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = write(this->_offset,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = writeUTF(this->_sourceQueueIdentifier,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       if (this->_eventType == ProvenanceEventRecord::FORK
-                       || this->_eventType == ProvenanceEventRecord::CLONE
-                       || this->_eventType == ProvenanceEventRecord::JOIN) {
-               // write UUIDs
-               uint32_t number = this->_parentUuids.size();
-               ret = write(number,&outStream);
-               if (ret != 4) {
-
-                       return false;
-               }
-               for (auto parentUUID : _parentUuids) {
-                       ret = writeUTF(parentUUID,&outStream);
-                       if (ret <= 0) {
-
-                               return false;
-                       }
-               }
-               number = this->_childrenUuids.size();
-               ret = write(number,&outStream);
-               if (ret != 4) {
-                       return false;
-               }
-               for (auto childUUID : _childrenUuids) {
-                       ret = writeUTF(childUUID,&outStream);
-                       if (ret <= 0) {
-
-                               return false;
-                       }
-               }
-       } else if (this->_eventType == ProvenanceEventRecord::SEND
-                       || this->_eventType == ProvenanceEventRecord::FETCH) {
-               ret = writeUTF(this->_transitUri,&outStream);
-               if (ret <= 0) {
-
-                       return false;
-               }
-       } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
-               ret = writeUTF(this->_transitUri,&outStream);
-               if (ret <= 0) {
-
-                       return false;
-               }
-               ret = 
writeUTF(this->_sourceSystemFlowFileIdentifier,&outStream);
-               if (ret <= 0) {
-
-                       return false;
-               }
-       }
-
-       // Persistent to the DB
-
-       if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), 
outStream.getSize())) {
-               logger_->log_debug("NiFi Provenance Store event %s size %d 
success",
-                               _eventIdStr.c_str(), outStream.getSize());
-               return true;
-       } else {
-               logger_->log_error("NiFi Provenance Store event %s size %d 
fail",
-                               _eventIdStr.c_str(), outStream.getSize());
-               return false;
-       }
-
-       // cleanup
-
-       return true;
-}
-
-bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int 
bufferSize) {
-
-       int ret;
-
-       DataStream outStream(buffer,bufferSize);
-
-       ret = readUTF(this->_eventIdStr,&outStream);
-
-       if (ret <= 0) {
-               return false;
-       }
-
-       uint32_t eventType;
-       ret = read(eventType,&outStream);
-       if (ret != 4) {
-               return false;
-       }
-       this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) 
eventType;
-
-       ret = read(this->_eventTime,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = read(this->_entryDate,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = read(this->_eventDuration,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = read(this->_lineageStartDate,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = readUTF(this->_componentId,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       ret = readUTF(this->_componentType,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       ret = readUTF(this->_uuid,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       ret = readUTF(this->_details,&outStream);
-
-       if (ret <= 0) {
-               return false;
-       }
-
-       // read flow attributes
-       uint32_t numAttributes = 0;
-       ret = read(numAttributes,&outStream);
-       if (ret != 4) {
-               return false;
-       }
-
-       for (uint32_t i = 0; i < numAttributes; i++) {
-               std::string key;
-               ret = readUTF(key,&outStream, true);
-               if (ret <= 0) {
-                       return false;
-               }
-               std::string value;
-               ret = readUTF(value,&outStream, true);
-               if (ret <= 0) {
-                       return false;
-               }
-               this->_attributes[key] = value;
-       }
-
-       ret = readUTF(this->_contentFullPath,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       ret = read(this->_size,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = read(this->_offset,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = readUTF(this->_sourceQueueIdentifier,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       if (this->_eventType == ProvenanceEventRecord::FORK
-                       || this->_eventType == ProvenanceEventRecord::CLONE
-                       || this->_eventType == ProvenanceEventRecord::JOIN) {
-               // read UUIDs
-               uint32_t number = 0;
-               ret = read(number,&outStream);
-               if (ret != 4) {
-                       return false;
-               }
-
-
-               for (uint32_t i = 0; i < number; i++) {
-                       std::string parentUUID;
-                       ret = readUTF(parentUUID,&outStream);
-                       if (ret <= 0) {
-                               return false;
-                       }
-                       this->addParentUuid(parentUUID);
-               }
-               number = 0;
-               ret = read(number,&outStream);
-               if (ret != 4) {
-                       return false;
-               }
-               for (uint32_t i = 0; i < number; i++) {
-                       std::string childUUID;
-                       ret = readUTF(childUUID,&outStream);
-                       if (ret <= 0) {
-                               return false;
-                       }
-                       this->addChildUuid(childUUID);
-               }
-       } else if (this->_eventType == ProvenanceEventRecord::SEND
-                       || this->_eventType == ProvenanceEventRecord::FETCH) {
-               ret = readUTF(this->_transitUri,&outStream);
-               if (ret <= 0) {
-                       return false;
-               }
-       } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
-               ret = readUTF(this->_transitUri,&outStream);
-               if (ret <= 0) {
-                       return false;
-               }
-               ret = readUTF(this->_sourceSystemFlowFileIdentifier,&outStream);
-               if (ret <= 0) {
-                       return false;
-               }
-       }
-
-       return true;
-}
-
-void ProvenanceReporter::commit() {
-       if 
(!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isEnable())
-               return;
-       for (auto event : _events) {
-               if 
(!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull())
 {
-                       event->Serialize(
-                                       
FlowControllerFactory::getFlowController()->getProvenanceRepository());
-               } else {
-                       logger_->log_debug("Provenance Repository is full");
-               }
-       }
-}
-
-void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE,
-                       flow);
-
-       if (event) {
-               event->setDetails(detail);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation,
-               std::string detail, uint64_t processingDuration) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, 
flow);
-
-       if (event) {
-               event->setDetails(detail);
-               event->setRelationship(relation.getName());
-               event->setEventDuration(processingDuration);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow,
-               std::string detail) {
-       ProvenanceEventRecord *event = allocate(
-                       ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
-
-       if (event) {
-               event->setDetails(detail);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string 
detail,
-               uint64_t processingDuration) {
-       ProvenanceEventRecord *event = allocate(
-                       ProvenanceEventRecord::CONTENT_MODIFIED, flow);
-
-       if (event) {
-               event->setDetails(detail);
-               event->setEventDuration(processingDuration);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE,
-                       parent);
-
-       if (event) {
-               event->addChildFlowFile(child);
-               event->addParentFlowFile(parent);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents,
-               FlowFileRecord *child, std::string detail,
-               uint64_t processingDuration) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, 
child);
-
-       if (event) {
-               event->addChildFlowFile(child);
-               std::vector<FlowFileRecord *>::iterator it;
-               for (it = parents.begin(); it != parents.end(); it++) {
-                       FlowFileRecord *record = *it;
-                       event->addParentFlowFile(record);
-               }
-               event->setDetails(detail);
-               event->setEventDuration(processingDuration);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child,
-               FlowFileRecord *parent, std::string detail,
-               uint64_t processingDuration) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK,
-                       parent);
-
-       if (event) {
-               event->addParentFlowFile(parent);
-               std::vector<FlowFileRecord *>::iterator it;
-               for (it = child.begin(); it != child.end(); it++) {
-                       FlowFileRecord *record = *it;
-                       event->addChildFlowFile(record);
-               }
-               event->setDetails(detail);
-               event->setEventDuration(processingDuration);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE,
-                       flow);
-
-       if (event) {
-               event->setDetails(detail);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, 
flow);
-
-       if (event) {
-               std::string dropReason = "Discard reason: " + reason;
-               event->setDetails(dropReason);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri,
-               std::string detail, uint64_t processingDuration, bool force) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, 
flow);
-
-       if (event) {
-               event->setTransitUri(transitUri);
-               event->setDetails(detail);
-               event->setEventDuration(processingDuration);
-               if (!force) {
-                       add(event);
-               } else {
-                       if 
(!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull())
-                               event->Serialize(
-                                               
FlowControllerFactory::getFlowController()->getProvenanceRepository());
-                       delete event;
-               }
-       }
-}
-
-void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri,
-               std::string sourceSystemFlowFileIdentifier, std::string detail,
-               uint64_t processingDuration) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE,
-                       flow);
-
-       if (event) {
-               event->setTransitUri(transitUri);
-               event->setDetails(detail);
-               event->setEventDuration(processingDuration);
-               event->setSourceSystemFlowFileIdentifier(
-                               sourceSystemFlowFileIdentifier);
-               add(event);
-       }
-}
-
-void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri,
-               std::string detail, uint64_t processingDuration) {
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, 
flow);
-
-       if (event) {
-               event->setTransitUri(transitUri);
-               event->setDetails(detail);
-               event->setEventDuration(processingDuration);
-               add(event);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp
deleted file mode 100644
index d7cc83a..0000000
--- a/libminifi/src/PutFile.cpp
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * @file PutFile.cpp
- * PutFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <uuid/uuid.h>
-
-#include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
-#include "PutFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
-
-const std::string PutFile::ProcessorName("PutFile");
-
-Property PutFile::Directory("Output Directory", "The output directory to which 
to put files", ".");
-Property PutFile::ConflictResolution("Conflict Resolution Strategy", 
"Indicates what should happen when a file with the same name already exists in 
the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL);
-
-Relationship PutFile::Success("success", "All files are routed to success");
-Relationship PutFile::Failure("failure", "Failed files (conflict, write 
failure, etc.) are transferred to failure");
-
-void PutFile::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(Directory);
-       properties.insert(ConflictResolution);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       relationships.insert(Failure);
-       setSupportedRelationships(relationships);
-}
-
-void PutFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string directory;
-
-       if (!context->getProperty(Directory.getName(), directory))
-       {
-               logger_->log_error("Directory attribute is missing or invalid");
-               return;
-       }
-
-       std::string conflictResolution;
-
-       if (!context->getProperty(ConflictResolution.getName(), 
conflictResolution))
-       {
-               logger_->log_error("Conflict Resolution Strategy attribute is 
missing or invalid");
-               return;
-       }
-
-       FlowFileRecord *flowFile = session->get();
-
-       // Do nothing if there are no incoming files
-       if (!flowFile)
-       {
-               return;
-       }
-
-       std::string filename;
-       flowFile->getAttribute(FILENAME, filename);
-
-       // Generate a safe (universally-unique) temporary filename on the same 
partition 
-       char tmpFileUuidStr[37];
-       uuid_t tmpFileUuid;
-       uuid_generate(tmpFileUuid);
-       uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr);
-       std::stringstream tmpFileSs;
-       tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
-       std::string tmpFile = tmpFileSs.str();
-       logger_->log_info("PutFile using temporary file %s", tmpFile.c_str());
-
-       // Determine dest full file paths
-       std::stringstream destFileSs;
-       destFileSs << directory << "/" << filename;
-       std::string destFile = destFileSs.str();
-
-       logger_->log_info("PutFile writing file %s into directory %s", 
filename.c_str(), directory.c_str());
-
-       // If file exists, apply conflict resolution strategy
-       struct stat statResult;
-
-       if (stat(destFile.c_str(), &statResult) == 0)
-       {
-               logger_->log_info("Destination file %s exists; applying 
Conflict Resolution Strategy: %s", destFile.c_str(), 
conflictResolution.c_str());
-
-               if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE)
-               {
-                       putFile(session, flowFile, tmpFile, destFile);
-               }
-               else if (conflictResolution == 
CONFLICT_RESOLUTION_STRATEGY_IGNORE)
-               {
-                       session->transfer(flowFile, Success);
-               }
-               else
-               {
-                       session->transfer(flowFile, Failure);
-               }
-       }
-       else
-       {
-               putFile(session, flowFile, tmpFile, destFile);
-       }
-}
-
-bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const 
std::string &tmpFile, const std::string &destFile)
-{
-
-       ReadCallback cb(tmpFile, destFile);
-       session->read(flowFile, &cb);
-
-       if (cb.commit())
-       {
-               session->transfer(flowFile, Success);
-               return true;
-       }
-       else
-       {
-               session->transfer(flowFile, Failure);
-       }
-       return false;
-}
-
-PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const 
std::string &destFile)
-: _tmpFile(tmpFile)
-, _tmpFileOs(tmpFile)
-, _destFile(destFile)
-{
-       logger_ = Logger::getLogger();
-}
-
-// Copy the entire file contents to the temporary file
-void PutFile::ReadCallback::process(std::ifstream *stream)
-{
-       // Copy file contents into tmp file
-       _writeSucceeded = false;
-       _tmpFileOs << stream->rdbuf();
-       _writeSucceeded = true;
-}
-
-// Renames tmp file to final destination
-// Returns true if commit succeeded
-bool PutFile::ReadCallback::commit()
-{
-       bool success = false;
-
-       logger_->log_info("PutFile committing put file operation to %s", 
_destFile.c_str());
-
-       if (_writeSucceeded)
-       {
-               _tmpFileOs.close();
-
-               if (rename(_tmpFile.c_str(), _destFile.c_str()))
-               {
-                       logger_->log_info("PutFile commit put file operation to 
%s failed because rename() call failed", _destFile.c_str());
-               }
-               else
-               {
-                       success = true;
-                       logger_->log_info("PutFile commit put file operation to 
%s succeeded", _destFile.c_str());
-               }
-       }
-       else
-       {
-               logger_->log_error("PutFile commit put file operation to %s 
failed because write failed", _destFile.c_str());
-       }
-
-       return success;
-}
-
-// Clean up resources
-PutFile::ReadCallback::~ReadCallback() {
-       // Close tmp file
-       _tmpFileOs.close();
-
-       // Clean up tmp file, if necessary
-       unlink(_tmpFile.c_str());
-}

Reply via email to