http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenSyslog.cpp b/libminifi/src/ListenSyslog.cpp
new file mode 100644
index 0000000..ace37d7
--- /dev/null
+++ b/libminifi/src/ListenSyslog.cpp
@@ -0,0 +1,342 @@
+/**
+ * @file ListenSyslog.cpp
+ * ListenSyslog class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <queue>
+#include <stdio.h>
+#include <string>
+#include "TimeUtil.h"
+#include "ListenSyslog.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ListenSyslog::ProcessorName("ListenSyslog");
+Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each 
buffer used to receive Syslog messages.", "65507 B");
+Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The 
maximum size of the socket buffer that should be used.", "1 MB");
+Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The 
maximum number of concurrent connections to accept Syslog messages in TCP 
mode.", "2");
+Property ListenSyslog::MaxBatchSize("Max Batch Size",
+               "The maximum number of Syslog events to add to a single 
FlowFile.", "1");
+Property ListenSyslog::MessageDelimiter("Message Delimiter",
+               "Specifies the delimiter to place between Syslog messages when 
multiple messages are bundled together (see <Max Batch Size> property).", "\n");
+Property ListenSyslog::ParseMessages("Parse Messages",
+               "Indicates if the processor should parse the Syslog messages. 
If set to false, each outgoing FlowFile will only.", "false");
+Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog 
communication.", "UDP");
+Property ListenSyslog::Port("Port", "The port for Syslog communication.", 
"514");
+Relationship ListenSyslog::Success("success", "All files are routed to 
success");
+Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
+
+void ListenSyslog::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(RecvBufSize);
+       properties.insert(MaxSocketBufSize);
+       properties.insert(MaxConnections);
+       properties.insert(MaxBatchSize);
+       properties.insert(MessageDelimiter);
+       properties.insert(ParseMessages);
+       properties.insert(Protocol);
+       properties.insert(Port);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       relationships.insert(Invalid);
+       setSupportedRelationships(relationships);
+}
+
+void ListenSyslog::startSocketThread()
+{
+       if (_thread != NULL)
+               return;
+
+       _logger->log_info("ListenSysLog Socket Thread Start");
+       _serverTheadRunning = true;
+       _thread = new std::thread(run, this);
+       _thread->detach();
+}
+
+void ListenSyslog::run(ListenSyslog *process)
+{
+       process->runThread();
+}
+
+void ListenSyslog::runThread()
+{
+       while (_serverTheadRunning)
+       {
+               if (_resetServerSocket)
+               {
+                       _resetServerSocket = false;
+                       // need to reset the socket
+                       std::vector<int>::iterator it;
+                       for (it = _clientSockets.begin(); it != 
_clientSockets.end(); ++it)
+                       {
+                               int clientSocket = *it;
+                               close(clientSocket);
+                       }
+                       _clientSockets.clear();
+                       if (_serverSocket > 0)
+                       {
+                               close(_serverSocket);
+                               _serverSocket = 0;
+                       }
+               }
+
+               if (_serverSocket <= 0)
+               {
+                       uint16_t portno = _port;
+                       struct sockaddr_in serv_addr;
+                       int sockfd;
+                       if (_protocol == "TCP")
+                               sockfd = socket(AF_INET, SOCK_STREAM, 0);
+                       else
+                               sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+                       if (sockfd < 0)
+                       {
+                               _logger->log_info("ListenSysLog Server socket 
creation failed");
+                               break;
+                       }
+                       bzero((char *) &serv_addr, sizeof(serv_addr));
+                       serv_addr.sin_family = AF_INET;
+                       serv_addr.sin_addr.s_addr = INADDR_ANY;
+                       serv_addr.sin_port = htons(portno);
+                       if (bind(sockfd, (struct sockaddr *) &serv_addr,
+                                       sizeof(serv_addr)) < 0)
+                       {
+                               _logger->log_error("ListenSysLog Server socket 
bind failed");
+                               break;
+                       }
+                       if (_protocol == "TCP")
+                               listen(sockfd,5);
+                       _serverSocket = sockfd;
+                       _logger->log_error("ListenSysLog Server socket %d bind 
OK to port %d", _serverSocket, portno);
+               }
+               FD_ZERO(&_readfds);
+               FD_SET(_serverSocket, &_readfds);
+               _maxFds = _serverSocket;
+               std::vector<int>::iterator it;
+               for (it = _clientSockets.begin(); it != _clientSockets.end(); 
++it)
+               {
+                       int clientSocket = *it;
+                       if (clientSocket >= _maxFds)
+                               _maxFds = clientSocket;
+                       FD_SET(clientSocket, &_readfds);
+               }
+               fd_set fds;
+               struct timeval tv;
+               int retval;
+               fds = _readfds;
+               tv.tv_sec = 0;
+               // 100 msec
+               tv.tv_usec = 100000;
+               retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
+               if (retval < 0)
+                       break;
+               if (retval == 0)
+                       continue;
+               if (FD_ISSET(_serverSocket, &fds))
+               {
+                       // server socket, either we have UDP datagram or TCP 
connection request
+                       if (_protocol == "TCP")
+                       {
+                               socklen_t clilen;
+                               struct sockaddr_in cli_addr;
+                               clilen = sizeof(cli_addr);
+                               int newsockfd = accept(_serverSocket,
+                                               (struct sockaddr *) &cli_addr,
+                                               &clilen);
+                               if (newsockfd > 0)
+                               {
+                                       if (_clientSockets.size() < 
_maxConnections)
+                                       {
+                                               
_clientSockets.push_back(newsockfd);
+                                               _logger->log_info("ListenSysLog 
new client socket %d connection", newsockfd);
+                                               continue;
+                                       }
+                                       else
+                                       {
+                                               close(newsockfd);
+                                       }
+                               }
+                       }
+                       else
+                       {
+                               socklen_t clilen;
+                               struct sockaddr_in cli_addr;
+                               clilen = sizeof(cli_addr);
+                               int recvlen = recvfrom(_serverSocket, _buffer, 
sizeof(_buffer), 0,
+                                               (struct sockaddr *)&cli_addr, 
&clilen);
+                               if (recvlen > 0 && (recvlen + 
getEventQueueByteSize()) <= _recvBufSize)
+                               {
+                                       uint8_t *payload = new uint8_t[recvlen];
+                                       memcpy(payload, _buffer, recvlen);
+                                       putEvent(payload, recvlen);
+                               }
+                       }
+               }
+               it = _clientSockets.begin();
+               while (it != _clientSockets.end())
+               {
+                       int clientSocket = *it;
+                       if (FD_ISSET(clientSocket, &fds))
+                       {
+                               int recvlen = readline(clientSocket, (char 
*)_buffer, sizeof(_buffer));
+                               if (recvlen <= 0)
+                               {
+                                       close(clientSocket);
+                                       _logger->log_info("ListenSysLog client 
socket %d close", clientSocket);
+                                       it = _clientSockets.erase(it);
+                               }
+                               else
+                               {
+                                       if ((recvlen + getEventQueueByteSize()) 
<= _recvBufSize)
+                                       {
+                                               uint8_t *payload = new 
uint8_t[recvlen];
+                                               memcpy(payload, _buffer, 
recvlen);
+                                               putEvent(payload, recvlen);
+                                       }
+                                       ++it;
+                               }
+                       }
+               }
+       }
+       return;
+}
+
+
+int ListenSyslog::readline( int fd, char *bufptr, size_t len )
+{
+       char *bufx = bufptr;
+       static char *bp;
+       static int cnt = 0;
+       static char b[ 2048 ];
+       char c;
+
+       while ( --len > 0 )
+    {
+      if ( --cnt <= 0 )
+      {
+         cnt = recv( fd, b, sizeof( b ), 0 );
+         if ( cnt < 0 )
+         {
+                 if ( errno == EINTR )
+                 {
+                         len++;                /* the while will decrement */
+                         continue;
+                 }
+                 return -1;
+         }
+         if ( cnt == 0 )
+                 return 0;
+         bp = b;
+      }
+      c = *bp++;
+      *bufptr++ = c;
+      if ( c == '\n' )
+      {
+         *bufptr = '\n';
+         return bufptr - bufx + 1;
+      }
+    }
+       return -1;
+}
+
+void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string value;
+       bool needResetServerSocket = false;
+       if (context->getProperty(Protocol.getName(), value))
+       {
+               if (_protocol != value)
+                       needResetServerSocket = true;
+               _protocol = value;
+       }
+       if (context->getProperty(RecvBufSize.getName(), value))
+       {
+               Property::StringToInt(value, _recvBufSize);
+       }
+       if (context->getProperty(MaxSocketBufSize.getName(), value))
+       {
+               Property::StringToInt(value, _maxSocketBufSize);
+       }
+       if (context->getProperty(MaxConnections.getName(), value))
+       {
+               Property::StringToInt(value, _maxConnections);
+       }
+       if (context->getProperty(MessageDelimiter.getName(), value))
+       {
+               _messageDelimiter = value;
+       }
+       if (context->getProperty(ParseMessages.getName(), value))
+       {
+               Property::StringToBool(value, _parseMessages);
+       }
+       if (context->getProperty(Port.getName(), value))
+       {
+               int64_t oldPort = _port;
+               Property::StringToInt(value, _port);
+               if (_port != oldPort)
+                       needResetServerSocket = true;
+       }
+       if (context->getProperty(MaxBatchSize.getName(), value))
+       {
+               Property::StringToInt(value, _maxBatchSize);
+       }
+
+       if (needResetServerSocket)
+               _resetServerSocket = true;
+
+       startSocketThread();
+
+       // read from the event queue
+       if (isEventQueueEmpty())
+       {
+               context->yield();
+               return;
+       }
+
+       std::queue<SysLogEvent> eventQueue;
+       pollEvent(eventQueue, _maxBatchSize);
+       bool firstEvent = true;
+       FlowFileRecord *flowFile = NULL;
+       while(!eventQueue.empty())
+       {
+               SysLogEvent event = eventQueue.front();
+               eventQueue.pop();
+               if (firstEvent)
+               {
+                       flowFile = session->create();
+                       if (!flowFile)
+                               return;
+                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
+                       session->write(flowFile, &callback);
+                       delete[] event.payload;
+                       firstEvent = false;
+               }
+               else
+               {
+                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
+                       session->append(flowFile, &callback);
+                       delete[] event.payload;
+               }
+       }
+       flowFile->addAttribute("syslog.protocol", _protocol);
+       flowFile->addAttribute("syslog.port", std::to_string(_port));
+       session->transfer(flowFile, Success);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/LogAttribute.cpp b/libminifi/src/LogAttribute.cpp
new file mode 100644
index 0000000..82130f8
--- /dev/null
+++ b/libminifi/src/LogAttribute.cpp
@@ -0,0 +1,158 @@
+/**
+ * @file LogAttribute.cpp
+ * LogAttribute class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "TimeUtil.h"
+#include "LogAttribute.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string LogAttribute::ProcessorName("LogAttribute");
+Property LogAttribute::LogLevel("Log Level", "The Log Level to use when 
logging the Attributes", "info");
+Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated 
list of Attributes to Log. If not specified, all attributes will be logged.", 
"");
+Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A 
comma-separated list of Attributes to ignore. If not specified, no attributes 
will be ignored.", "");
+Property LogAttribute::LogPayload("Log Payload",
+               "If true, the FlowFile's payload will be logged, in addition to 
its attributes; otherwise, just the Attributes will be logged.", "false");
+Property LogAttribute::LogPrefix("Log prefix",
+               "Log prefix appended to the log lines. It helps to distinguish 
the output of multiple LogAttribute processors.", "");
+Relationship LogAttribute::Success("success", "success operational on the flow 
record");
+
+void LogAttribute::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(LogLevel);
+       properties.insert(AttributesToLog);
+       properties.insert(AttributesToIgnore);
+       properties.insert(LogPayload);
+       properties.insert(LogPrefix);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string dashLine = 
"--------------------------------------------------";
+       LogAttrLevel level = LogAttrLevelInfo;
+       bool logPayload = false;
+       std::ostringstream message;
+
+       FlowFileRecord *flow = session->get();
+
+       if (!flow)
+               return;
+
+       std::string value;
+       if (context->getProperty(LogLevel.getName(), value))
+       {
+               logLevelStringToEnum(value, level);
+       }
+       if (context->getProperty(LogPrefix.getName(), value))
+       {
+               dashLine = "-----" + value + "-----";
+       }
+       if (context->getProperty(LogPayload.getName(), value))
+       {
+               Property::StringToBool(value, logPayload);
+       }
+
+       message << "Logging for flow file " << "\n";
+       message << dashLine;
+       message << "\nStandard FlowFile Attributes";
+       message << "\n" << "UUID:" << flow->getUUIDStr();
+       message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
+       message << "\n" << "lineageStartDate:" << 
getTimeStr(flow->getlineageStartDate());
+       message << "\n" << "Size:" << flow->getSize() << " Offset:" << 
flow->getOffset();
+       message << "\nFlowFile Attributes Map Content";
+       std::map<std::string, std::string> attrs = flow->getAttributes();
+    std::map<std::string, std::string>::iterator it;
+    for (it = attrs.begin(); it!= attrs.end(); it++)
+    {
+       message << "\n" << "key:" << it->first << " value:" << it->second;
+    }
+    message << "\nFlowFile Resource Claim Content";
+    ResourceClaim *claim = flow->getResourceClaim();
+    if (claim)
+    {
+       message << "\n" << "Content Claim:" << claim->getContentFullPath();
+    }
+    if (logPayload && flow->getSize() <= 1024*1024)
+    {
+       message << "\n" << "Payload:" << "\n";
+       ReadCallback callback(flow->getSize());
+       session->read(flow, &callback);
+       for (unsigned int i = 0, j = 0; i < callback._readSize; i++)
+       {
+               char temp[8];
+               sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
+               message << temp;
+               j++;
+               if (j == 16)
+               {
+                       message << '\n';
+                       j = 0;
+               }
+       }
+    }
+    message << "\n" << dashLine << std::ends;
+    std::string output = message.str();
+
+    switch (level)
+    {
+    case LogAttrLevelInfo:
+       _logger->log_info("%s", output.c_str());
+               break;
+    case LogAttrLevelDebug:
+       _logger->log_debug("%s", output.c_str());
+               break;
+    case LogAttrLevelError:
+       _logger->log_error("%s", output.c_str());
+               break;
+    case LogAttrLevelTrace:
+       _logger->log_trace("%s", output.c_str());
+       break;
+    case LogAttrLevelWarn:
+       _logger->log_warn("%s", output.c_str());
+       break;
+    default:
+       break;
+    }
+
+    // Test Import
+    /*
+    FlowFileRecord *importRecord = session->create();
+    session->import(claim->getContentFullPath(), importRecord);
+    session->transfer(importRecord, Success); */
+
+
+    // Transfer to the relationship
+    session->transfer(flow, Success);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Logger.cpp b/libminifi/src/Logger.cpp
new file mode 100644
index 0000000..984f609
--- /dev/null
+++ b/libminifi/src/Logger.cpp
@@ -0,0 +1,27 @@
+/**
+ * @file Logger.cpp
+ * Logger class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "Logger.h"
+
+Logger *Logger::_logger(NULL);
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
new file mode 100644
index 0000000..70ee9d7
--- /dev/null
+++ b/libminifi/src/ProcessGroup.cpp
@@ -0,0 +1,314 @@
+/**
+ * @file ProcessGroup.cpp
+ * ProcessGroup class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "ProcessGroup.h"
+#include "Processor.h"
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t 
uuid, ProcessGroup *parent)
+: _name(name),
+  _type(type),
+  _parentProcessGroup(parent)
+{
+       if (!uuid)
+               // Generate the global UUID for the flow record
+               uuid_generate(_uuid);
+       else
+               uuid_copy(_uuid, uuid);
+
+       _yieldPeriodMsec = 0;
+       _transmitting = false;
+
+       _logger = Logger::getLogger();
+       _logger->log_info("ProcessGroup %s created", _name.c_str());
+}
+
+ProcessGroup::~ProcessGroup()
+{
+       for (std::set<Connection *>::iterator it = _connections.begin(); it != 
_connections.end(); ++it)
+       {
+               Connection *connection = *it;
+               connection->drain();
+               delete connection;
+       }
+
+       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+       {
+               ProcessGroup *processGroup(*it);
+               delete processGroup;
+       }
+
+       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
+       {
+               Processor *processor(*it);
+               delete processor;
+       }
+}
+
+bool ProcessGroup::isRootProcessGroup()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+       return (_type == ROOT_PROCESS_GROUP);
+}
+
+void ProcessGroup::addProcessor(Processor *processor)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_processors.find(processor) == _processors.end())
+       {
+               // We do not have the same processor in this process group yet
+               _processors.insert(processor);
+               _logger->log_info("Add processor %s into process group %s",
+                               processor->getName().c_str(), _name.c_str());
+       }
+}
+
+void ProcessGroup::removeProcessor(Processor *processor)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_processors.find(processor) != _processors.end())
+       {
+               // We do have the same processor in this process group yet
+               _processors.erase(processor);
+               _logger->log_info("Remove processor %s from process group %s",
+                               processor->getName().c_str(), _name.c_str());
+       }
+}
+
+void ProcessGroup::addProcessGroup(ProcessGroup *child)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_childProcessGroups.find(child) == _childProcessGroups.end())
+       {
+               // We do not have the same child process group in this process 
group yet
+               _childProcessGroups.insert(child);
+               _logger->log_info("Add child process group %s into process 
group %s",
+                               child->getName().c_str(), _name.c_str());
+       }
+}
+
+void ProcessGroup::removeProcessGroup(ProcessGroup *child)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_childProcessGroups.find(child) != _childProcessGroups.end())
+       {
+               // We do have the same child process group in this process 
group yet
+               _childProcessGroups.erase(child);
+               _logger->log_info("Remove child process group %s from process 
group %s",
+                               child->getName().c_str(), _name.c_str());
+       }
+}
+
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       try
+       {
+               // Start all the processor node, input and output ports
+               for (std::set<Processor *>::iterator it = _processors.begin(); 
it != _processors.end(); ++it)
+               {
+                       Processor *processor(*it);
+                       if (!processor->isRunning() && 
processor->getScheduledState() != DISABLED)
+                       {
+                               if (processor->getSchedulingStrategy() == 
TIMER_DRIVEN)
+                                       timeScheduler->schedule(processor);
+                       }
+               }
+
+               for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+               {
+                       ProcessGroup *processGroup(*it);
+                       processGroup->startProcessing(timeScheduler);
+               }
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process group start 
processing");
+               throw;
+       }
+}
+
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       try
+       {
+               // Stop all the processor node, input and output ports
+               for (std::set<Processor *>::iterator it = _processors.begin(); 
it != _processors.end(); ++it)
+               {
+                       Processor *processor(*it);
+                       if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+                                       timeScheduler->unschedule(processor);
+               }
+
+               for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+               {
+                       ProcessGroup *processGroup(*it);
+                       processGroup->stopProcessing(timeScheduler);
+               }
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process group stop 
processing");
+               throw;
+       }
+}
+
+Processor *ProcessGroup::findProcessor(uuid_t uuid)
+{
+       Processor *ret = NULL;
+       // std::lock_guard<std::mutex> lock(_mtx);
+
+       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
+       {
+               Processor *processor(*it);
+               uuid_t processorUUID;
+               if (processor->getUUID(processorUUID) && 
uuid_compare(processorUUID, uuid) == 0)
+                       return processor;
+       }
+
+       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+       {
+               ProcessGroup *processGroup(*it);
+               Processor *processor = processGroup->findProcessor(uuid);
+               if (processor)
+                       return processor;
+       }
+
+       return ret;
+}
+
+Processor *ProcessGroup::findProcessor(std::string processorName)
+{
+       Processor *ret = NULL;
+
+       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
+       {
+               Processor *processor(*it);
+               _logger->log_debug("Current processor is %s", 
processor->getName().c_str());
+               if (processor->getName() == processorName)
+                       return processor;
+       }
+
+       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+       {
+               ProcessGroup *processGroup(*it);
+               Processor *processor = 
processGroup->findProcessor(processorName);
+               if (processor)
+                       return processor;
+       }
+
+       return ret;
+}
+
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string 
propertyName, std::string propertyValue)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
+       {
+               Processor *processor(*it);
+               if (processor->getName() == processorName)
+               {
+                       processor->setProperty(propertyName, propertyValue);
+               }
+       }
+
+       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+       {
+               ProcessGroup *processGroup(*it);
+               processGroup->updatePropertyValue(processorName, propertyName, 
propertyValue);
+       }
+
+       return;
+}
+
+void ProcessGroup::addConnection(Connection *connection)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_connections.find(connection) == _connections.end())
+       {
+               // We do not have the same connection in this process group yet
+               _connections.insert(connection);
+               _logger->log_info("Add connection %s into process group %s",
+                               connection->getName().c_str(), _name.c_str());
+               uuid_t sourceUUID;
+               Processor *source = NULL;
+               connection->getSourceProcessorUUID(sourceUUID);
+               source = this->findProcessor(sourceUUID);
+               if (source)
+                       source->addConnection(connection);
+               Processor *destination = NULL;
+               uuid_t destinationUUID;
+               connection->getDestinationProcessorUUID(destinationUUID);
+               destination = this->findProcessor(destinationUUID);
+               if (destination && destination != source)
+                       destination->addConnection(connection);
+       }
+}
+
+void ProcessGroup::removeConnection(Connection *connection)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_connections.find(connection) != _connections.end())
+       {
+               // We do not have the same connection in this process group yet
+               _connections.erase(connection);
+               _logger->log_info("Remove connection %s into process group %s",
+                               connection->getName().c_str(), _name.c_str());
+               uuid_t sourceUUID;
+               Processor *source = NULL;
+               connection->getSourceProcessorUUID(sourceUUID);
+               source = this->findProcessor(sourceUUID);
+               if (source)
+                       source->removeConnection(connection);
+               Processor *destination = NULL;
+               uuid_t destinationUUID;
+               connection->getDestinationProcessorUUID(destinationUUID);
+               destination = this->findProcessor(destinationUUID);
+               if (destination && destination != source)
+                       destination->removeConnection(connection);
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
new file mode 100644
index 0000000..4f526c3
--- /dev/null
+++ b/libminifi/src/ProcessSession.cpp
@@ -0,0 +1,731 @@
+/**
+ * @file ProcessSession.cpp
+ * ProcessSession class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "ProcessSession.h"
+
+FlowFileRecord* ProcessSession::create()
+{
+       std::map<std::string, std::string> empty;
+       FlowFileRecord *record = new FlowFileRecord(empty);
+
+       if (record)
+       {
+               _addedFlowFiles[record->getUUIDStr()] = record;
+               _logger->log_debug("Create FlowFile with UUID %s", 
record->getUUIDStr().c_str());
+       }
+
+       return record;
+}
+
+FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
+{
+       FlowFileRecord *record = this->create();
+       if (record)
+       {
+               // Copy attributes
+               std::map<std::string, std::string> parentAttributes = 
parent->getAttributes();
+           std::map<std::string, std::string>::iterator it;
+           for (it = parentAttributes.begin(); it!= parentAttributes.end(); 
it++)
+           {
+               if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
+                               it->first == FlowAttributeKey(DISCARD_REASON) ||
+                                       it->first == FlowAttributeKey(UUID))
+                       // Do not copy special attributes from parent
+                       continue;
+               record->setAttribute(it->first, it->second);
+           }
+           record->_lineageStartDate = parent->_lineageStartDate;
+           record->_lineageIdentifiers = parent->_lineageIdentifiers;
+           record->_lineageIdentifiers.insert(parent->_uuidStr);
+
+       }
+       return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
+{
+       FlowFileRecord *record = this->create(parent);
+       if (record)
+       {
+               // Copy Resource Claim
+               record->_claim = parent->_claim;
+               if (record->_claim)
+               {
+                       record->_offset = parent->_offset;
+                       record->_size = parent->_size;
+                       record->_claim->increaseFlowFileRecordOwnedCount();
+               }
+       }
+       return record;
+}
+
+FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
+{
+       std::map<std::string, std::string> empty;
+       FlowFileRecord *record = new FlowFileRecord(empty);
+
+       if (record)
+       {
+               this->_clonedFlowFiles[record->getUUIDStr()] = record;
+               _logger->log_debug("Clone FlowFile with UUID %s during 
transfer", record->getUUIDStr().c_str());
+               // Copy attributes
+               std::map<std::string, std::string> parentAttributes = 
parent->getAttributes();
+               std::map<std::string, std::string>::iterator it;
+               for (it = parentAttributes.begin(); it!= 
parentAttributes.end(); it++)
+               {
+                       if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) 
||
+                               it->first == FlowAttributeKey(DISCARD_REASON) ||
+                                       it->first == FlowAttributeKey(UUID))
+                       // Do not copy special attributes from parent
+                       continue;
+               record->setAttribute(it->first, it->second);
+           }
+           record->_lineageStartDate = parent->_lineageStartDate;
+           record->_lineageIdentifiers = parent->_lineageIdentifiers;
+           record->_lineageIdentifiers.insert(parent->_uuidStr);
+
+           // Copy Resource Claim
+           record->_claim = parent->_claim;
+           if (record->_claim)
+           {
+               record->_offset = parent->_offset;
+               record->_size = parent->_size;
+               record->_claim->increaseFlowFileRecordOwnedCount();
+           }
+       }
+
+       return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, 
long size)
+{
+       FlowFileRecord *record = this->create(parent);
+       if (record)
+       {
+               if (parent->_claim)
+               {
+                       if ((offset + size) > (long) parent->_size)
+                       {
+                               // Set offset and size
+                               _logger->log_error("clone offset %d and size %d 
exceed parent size %d",
+                                               offset, size, parent->_size);
+                               // Remove the Add FlowFile for the session
+                               std::map<std::string, FlowFileRecord 
*>::iterator it =
+                                               
this->_addedFlowFiles.find(record->getUUIDStr());
+                               if (it != this->_addedFlowFiles.end())
+                                       
this->_addedFlowFiles.erase(record->getUUIDStr());
+                               delete record;
+                               return NULL;
+                       }
+                       record->_offset = parent->_offset + parent->_offset;
+                       record->_size = size;
+                       // Copy Resource Claim
+                       record->_claim = parent->_claim;
+                       record->_claim->increaseFlowFileRecordOwnedCount();
+               }
+       }
+       return record;
+}
+
+void ProcessSession::remove(FlowFileRecord *flow)
+{
+       flow->_markedDelete = true;
+       _deletedFlowFiles[flow->getUUIDStr()] = flow;
+}
+
+void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, 
std::string value)
+{
+       flow->setAttribute(key, value);
+}
+
+void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
+{
+       flow->removeAttribute(key);
+}
+
+void ProcessSession::penalize(FlowFileRecord *flow)
+{
+       flow->_penaltyExpirationMs = getTimeMillis() + 
this->_processContext->getProcessor()->getPenalizationPeriodMsec();
+}
+
+void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
+{
+       _transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback 
*callback)
+{
+       ResourceClaim *claim = NULL;
+
+       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+
+       try
+       {
+               std::ofstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
+               if (fs.is_open())
+               {
+                       // Call the callback to write the content
+                       callback->process(&fs);
+                       if (fs.good() && fs.tellp() >= 0)
+                       {
+                               flow->_size = fs.tellp();
+                               flow->_offset = 0;
+                               if (flow->_claim)
+                               {
+                                       // Remove the old claim
+                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
+                                       flow->_claim = NULL;
+                               }
+                               flow->_claim = claim;
+                               claim->increaseFlowFileRecordOwnedCount();
+                               /*
+                               _logger->log_debug("Write offset %d length %d 
into content %s for FlowFile UUID %s",
+                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+                               fs.close();
+                       }
+                       else
+                       {
+                               fs.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
+               }
+       }
+       catch (std::exception &exception)
+       {
+               if (flow && flow->_claim == claim)
+               {
+                       flow->_claim->decreaseFlowFileRecordOwnedCount();
+                       flow->_claim = NULL;
+               }
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               if (flow && flow->_claim == claim)
+               {
+                       flow->_claim->decreaseFlowFileRecordOwnedCount();
+                       flow->_claim = NULL;
+               }
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception during process session 
write");
+               throw;
+       }
+}
+
+void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback 
*callback)
+{
+       ResourceClaim *claim = NULL;
+
+       if (flow->_claim == NULL)
+       {
+               // No existed claim for append, we need to create new claim
+               return write(flow, callback);
+       }
+
+       claim = flow->_claim;
+
+       try
+       {
+               std::ofstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::app);
+               if (fs.is_open())
+               {
+                       // Call the callback to write the content
+                       std::streampos oldPos = fs.tellp();
+                       callback->process(&fs);
+                       if (fs.good() && fs.tellp() >= 0)
+                       {
+                               uint64_t appendSize = fs.tellp() - oldPos;
+                               flow->_size += appendSize;
+                               /*
+                               _logger->log_debug("Append offset %d extra 
length %d to new size %d into content %s for FlowFile UUID %s",
+                                               flow->_offset, appendSize, 
flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); 
*/
+                               fs.close();
+                       }
+                       else
+                       {
+                               fs.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
+               }
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process session 
append");
+               throw;
+       }
+}
+
+void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
+{
+       try
+       {
+               ResourceClaim *claim = NULL;
+               if (flow->_claim == NULL)
+               {
+                       // No existed claim for read, we throw exception
+                       throw Exception(FILE_OPERATION_EXCEPTION, "No Content 
Claim existed for read");
+               }
+
+               claim = flow->_claim;
+               std::ifstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::in | 
std::fstream::binary);
+               if (fs.is_open())
+               {
+                       fs.seekg(flow->_offset, fs.beg);
+
+                       if (fs.good())
+                       {
+                               callback->process(&fs);
+                               /*
+                               _logger->log_debug("Read offset %d size %d 
content %s for FlowFile UUID %s",
+                                               flow->_offset, flow->_size, 
claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+                               fs.close();
+                       }
+                       else
+                       {
+                               fs.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Read Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
+               }
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process session 
read");
+               throw;
+       }
+}
+
+void ProcessSession::import(std::string source, FlowFileRecord *flow, bool 
keepSource, uint64_t offset)
+{
+       ResourceClaim *claim = NULL;
+
+       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+       char *buf = NULL;
+       int size = 4096;
+       buf = new char [size];
+
+       try
+       {
+               std::ofstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
+               std::ifstream input;
+               input.open(source.c_str(), std::fstream::in | 
std::fstream::binary);
+
+               if (fs.is_open() && input.is_open())
+               {
+                       // Open the source file and stream to the flow file
+                       input.seekg(offset, fs.beg);
+                       while (input.good())
+                       {
+                               input.read(buf, size);
+                               if (input)
+                                       fs.write(buf, size);
+                               else
+                                       fs.write(buf, input.gcount());
+                       }
+
+                       if (fs.good() && fs.tellp() >= 0)
+                       {
+                               flow->_size = fs.tellp();
+                               flow->_offset = 0;
+                               if (flow->_claim)
+                               {
+                                       // Remove the old claim
+                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
+                                       flow->_claim = NULL;
+                               }
+                               flow->_claim = claim;
+                               claim->increaseFlowFileRecordOwnedCount();
+                               /*
+                               _logger->log_debug("Import offset %d length %d 
into content %s for FlowFile UUID %s",
+                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+                               fs.close();
+                               input.close();
+                               if (!keepSource)
+                                       std::remove(source.c_str());
+                       }
+                       else
+                       {
+                               fs.close();
+                               input.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Import Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Import 
Error");
+               }
+
+               delete[] buf;
+       }
+       catch (std::exception &exception)
+       {
+               if (flow && flow->_claim == claim)
+               {
+                       flow->_claim->decreaseFlowFileRecordOwnedCount();
+                       flow->_claim = NULL;
+               }
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception %s", exception.what());
+               delete[] buf;
+               throw;
+       }
+       catch (...)
+       {
+               if (flow && flow->_claim == claim)
+               {
+                       flow->_claim->decreaseFlowFileRecordOwnedCount();
+                       flow->_claim = NULL;
+               }
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception during process session 
write");
+               delete[] buf;
+               throw;
+       }
+}
+
+void ProcessSession::commit()
+{
+       try
+       {
+               // First we clone the flow record based on the transfered 
relationship for updated flow record
+               std::map<std::string, FlowFileRecord *>::iterator it;
+               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                               continue;
+                       std::map<std::string, Relationship>::iterator 
itRelationship =
+                                       
this->_transferRelationship.find(record->getUUIDStr());
+                       if (itRelationship != _transferRelationship.end())
+                       {
+                               Relationship relationship = 
itRelationship->second;
+                               // Find the relationship, we need to find the 
connections for that relationship
+                               std::set<Connection *> connections =
+                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+                               if (connections.empty())
+                               {
+                                       // No connection
+                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
+                                       {
+                                               // Not autoterminate, we should 
have the connect
+                                               std::string message = "Connect 
empty for non auto terminated relationship" + relationship.getName();
+                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+                                       }
+                                       else
+                                       {
+                                               // Autoterminated
+                                               remove(record);
+                                       }
+                               }
+                               else
+                               {
+                                       // We connections, clone the flow and 
assign the connection accordingly
+                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
+                                       {
+                                               Connection 
*connection(*itConnection);
+                                               if (itConnection == 
connections.begin())
+                                               {
+                                                       // First connection 
which the flow need be routed to
+                                                       record->_connection = 
connection;
+                                               }
+                                               else
+                                               {
+                                                       // Clone the flow file 
and route to the connection
+                                                       FlowFileRecord 
*cloneRecord;
+                                                       cloneRecord = 
this->cloneDuringTransfer(record);
+                                                       if (cloneRecord)
+                                                               
cloneRecord->_connection = connection;
+                                                       else
+                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
+                                               }
+                                       }
+                               }
+                       }
+                       else
+                       {
+                               // Can not find relationship for the flow
+                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
+                       }
+               }
+
+               // Do the samething for added flow file
+               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                               continue;
+                       std::map<std::string, Relationship>::iterator 
itRelationship =
+                                       
this->_transferRelationship.find(record->getUUIDStr());
+                       if (itRelationship != _transferRelationship.end())
+                       {
+                               Relationship relationship = 
itRelationship->second;
+                               // Find the relationship, we need to find the 
connections for that relationship
+                               std::set<Connection *> connections =
+                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+                               if (connections.empty())
+                               {
+                                       // No connection
+                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
+                                       {
+                                               // Not autoterminate, we should 
have the connect
+                                               std::string message = "Connect 
empty for non auto terminated relationship " + relationship.getName();
+                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+                                       }
+                                       else
+                                       {
+                                               // Autoterminated
+                                               remove(record);
+                                       }
+                               }
+                               else
+                               {
+                                       // We connections, clone the flow and 
assign the connection accordingly
+                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
+                                       {
+                                               Connection 
*connection(*itConnection);
+                                               if (itConnection == 
connections.begin())
+                                               {
+                                                       // First connection 
which the flow need be routed to
+                                                       record->_connection = 
connection;
+                                               }
+                                               else
+                                               {
+                                                       // Clone the flow file 
and route to the connection
+                                                       FlowFileRecord 
*cloneRecord;
+                                                       cloneRecord = 
this->cloneDuringTransfer(record);
+                                                       if (cloneRecord)
+                                                               
cloneRecord->_connection = connection;
+                                                       else
+                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
+                                               }
+                                       }
+                               }
+                       }
+                       else
+                       {
+                               // Can not find relationship for the flow
+                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
+                       }
+               }
+
+               // Complete process the added and update flow files for the 
session, send the flow file to its queue
+               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                       {
+                               continue;
+                       }
+                       if (record->_connection)
+                               record->_connection->put(record);
+                       else
+                               delete record;
+               }
+               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                       {
+                               continue;
+                       }
+                       if (record->_connection)
+                               record->_connection->put(record);
+                       else
+                               delete record;
+               }
+               // Process the clone flow files
+               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                       {
+                               continue;
+                       }
+                       if (record->_connection)
+                               record->_connection->put(record);
+                       else
+                               delete record;
+               }
+               // Delete the deleted flow files
+               for (it = _deletedFlowFiles.begin(); it!= 
_deletedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               // Delete the snapshot
+               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               // All done
+               _updatedFlowFiles.clear();
+               _addedFlowFiles.clear();
+               _clonedFlowFiles.clear();
+               _deletedFlowFiles.clear();
+               _originalFlowFiles.clear();
+               _logger->log_trace("ProcessSession committed for %s", 
_processContext->getProcessor()->getName().c_str());
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process session 
commit");
+               throw;
+       }
+}
+
+
+void ProcessSession::rollback()
+{
+       try
+       {
+               std::map<std::string, FlowFileRecord *>::iterator it;
+               // Requeue the snapshot of the flowfile back
+               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_orginalConnection)
+                       {
+                               record->_snapshot = false;
+                               record->_orginalConnection->put(record);
+                       }
+                       else
+                               delete record;
+               }
+               _originalFlowFiles.clear();
+               // Process the clone flow files
+               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               _clonedFlowFiles.clear();
+               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               _addedFlowFiles.clear();
+               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               _updatedFlowFiles.clear();
+               _deletedFlowFiles.clear();
+               _logger->log_trace("ProcessSession rollback for %s", 
_processContext->getProcessor()->getName().c_str());
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process session 
roll back");
+               throw;
+       }
+}
+
+FlowFileRecord *ProcessSession::get()
+{
+       Connection *first = 
_processContext->getProcessor()->getNextIncomingConnection();
+
+       if (first == NULL)
+               return NULL;
+
+       Connection *current = first;
+
+       do
+       {
+               std::set<FlowFileRecord *> expired;
+               FlowFileRecord *ret = current->poll(expired);
+               if (expired.size() > 0)
+               {
+                       // Remove expired flow record
+                       for (std::set<FlowFileRecord *>::iterator it = 
expired.begin(); it != expired.end(); ++it)
+                       {
+                               delete (*it);
+                       }
+               }
+               if (ret)
+               {
+                       // add the flow record to the current process session 
update map
+                       ret->_markedDelete = false;
+                       _updatedFlowFiles[ret->getUUIDStr()] = ret;
+                       std::map<std::string, std::string> empty;
+                       FlowFileRecord *snapshot = new FlowFileRecord(empty);
+                       _logger->log_debug("Create Snapshot FlowFile with UUID 
%s", snapshot->getUUIDStr().c_str());
+                       snapshot->duplicate(ret);
+                       // save a snapshot
+                       _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
+                       return ret;
+               }
+               current = 
_processContext->getProcessor()->getNextIncomingConnection();
+       }
+       while (current != NULL && current != first);
+
+       return NULL;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
new file mode 100644
index 0000000..cc136dc
--- /dev/null
+++ b/libminifi/src/Processor.cpp
@@ -0,0 +1,451 @@
+/**
+ * @file Processor.cpp
+ * Processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+Processor::Processor(std::string name, uuid_t uuid)
+: _name(name)
+{
+       if (!uuid)
+               // Generate the global UUID for the flow record
+               uuid_generate(_uuid);
+       else
+               uuid_copy(_uuid, uuid);
+
+       char uuidStr[37];
+       uuid_unparse(_uuid, uuidStr);
+       _uuidStr = uuidStr;
+
+       // Setup the default values
+       _state = DISABLED;
+       _strategy = TIMER_DRIVEN;
+       _lossTolerant = false;
+       _triggerWhenEmpty = false;
+       _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
+       _runDurantionNano = 0;
+       _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+       _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+       _maxConcurrentTasks = 1;
+       _activeTasks = 0;
+       _yieldExpiration = 0;
+       _incomingConnectionsIter = this->_incomingConnections.begin();
+       _logger = Logger::getLogger();
+
+       _logger->log_info("Processor %s created UUID %s", _name.c_str(), 
_uuidStr.c_str());
+}
+
+Processor::~Processor()
+{
+
+}
+
+bool Processor::isRunning()
+{
+       return (_state == RUNNING && _activeTasks > 0);
+}
+
+bool Processor::setSupportedProperties(std::set<Property> properties)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not set processor property while the 
process %s is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _properties.clear();
+       for (std::set<Property>::iterator it = properties.begin(); it != 
properties.end(); ++it)
+       {
+               Property item(*it);
+               _properties[item.getName()] = item;
+               _logger->log_info("Processor %s supported property name %s", 
_name.c_str(), item.getName().c_str());
+       }
+
+       return true;
+}
+
+bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not set processor supported relationship 
while the process %s is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _relationships.clear();
+       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
+       {
+               Relationship item(*it);
+               _relationships[item.getName()] = item;
+               _logger->log_info("Processor %s supported relationship name 
%s", _name.c_str(), item.getName().c_str());
+       }
+
+       return true;
+}
+
+bool Processor::setAutoTerminatedRelationships(std::set<Relationship> 
relationships)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not set processor auto terminated 
relationship while the process %s is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _autoTerminatedRelationships.clear();
+       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
+       {
+               Relationship item(*it);
+               _autoTerminatedRelationships[item.getName()] = item;
+               _logger->log_info("Processor %s auto terminated relationship 
name %s", _name.c_str(), item.getName().c_str());
+       }
+
+       return true;
+}
+
+bool Processor::isAutoTerminated(Relationship relationship)
+{
+       bool isRun = isRunning();
+
+       if (!isRun)
+               _mtx.lock();
+
+       std::map<std::string, Relationship>::iterator it = 
_autoTerminatedRelationships.find(relationship.getName());
+       if (it != _autoTerminatedRelationships.end())
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return true;
+       }
+       else
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return false;
+       }
+}
+
+bool Processor::isSupportedRelationship(Relationship relationship)
+{
+       bool isRun = isRunning();
+
+       if (!isRun)
+               _mtx.lock();
+
+       std::map<std::string, Relationship>::iterator it = 
_relationships.find(relationship.getName());
+       if (it != _relationships.end())
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return true;
+       }
+       else
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return false;
+       }
+}
+
+bool Processor::getProperty(std::string name, std::string &value)
+{
+       bool isRun = isRunning();
+
+       if (!isRun)
+               // Because set property only allowed in non running state, we 
need to obtain lock avoid rack condition
+               _mtx.lock();
+
+       std::map<std::string, Property>::iterator it = _properties.find(name);
+       if (it != _properties.end())
+       {
+               Property item = it->second;
+               value = item.getValue();
+               if (!isRun)
+                       _mtx.unlock();
+               return true;
+       }
+       else
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return false;
+       }
+}
+
+bool Processor::setProperty(std::string name, std::string value)
+{
+
+       std::lock_guard<std::mutex> lock(_mtx);
+       std::map<std::string, Property>::iterator it = _properties.find(name);
+
+       if (it != _properties.end())
+       {
+               Property item = it->second;
+               item.setValue(value);
+               _properties[item.getName()] = item;
+               _logger->log_info("Processor %s property name %s value %s", 
_name.c_str(), item.getName().c_str(), value.c_str());
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+std::set<Connection *> Processor::getOutGoingConnections(std::string 
relationship)
+{
+       std::set<Connection *> empty;
+
+       std::map<std::string, std::set<Connection *>>::iterator it = 
_outGoingConnections.find(relationship);
+       if (it != _outGoingConnections.end())
+       {
+               return _outGoingConnections[relationship];
+       }
+       else
+       {
+               return empty;
+       }
+}
+
+bool Processor::addConnection(Connection *connection)
+{
+       bool ret = false;
+
+       if (isRunning())
+       {
+               _logger->log_info("Can not add connection while the process %s 
is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       uuid_t srcUUID;
+       uuid_t destUUID;
+
+       connection->getSourceProcessorUUID(srcUUID);
+       connection->getDestinationProcessorUUID(destUUID);
+
+       if (uuid_compare(_uuid, destUUID) == 0)
+       {
+               // Connection is destination to the current processor
+               if (_incomingConnections.find(connection) == 
_incomingConnections.end())
+               {
+                       _incomingConnections.insert(connection);
+                       connection->setDestinationProcessor(this);
+                       _logger->log_info("Add connection %s into Processor %s 
incoming connection",
+                                       connection->getName().c_str(), 
_name.c_str());
+                       _incomingConnectionsIter = 
this->_incomingConnections.begin();
+                       ret = true;
+               }
+       }
+
+       if (uuid_compare(_uuid, srcUUID) == 0)
+       {
+               std::string relationship = 
connection->getRelationship().getName();
+               // Connection is source from the current processor
+               std::map<std::string, std::set<Connection *>>::iterator it =
+                               _outGoingConnections.find(relationship);
+               if (it != _outGoingConnections.end())
+               {
+                       // We already has connection for this relationship
+                       std::set<Connection *> existedConnection = it->second;
+                       if (existedConnection.find(connection) == 
existedConnection.end())
+                       {
+                               // We do not have the same connection for this 
relationship yet
+                               existedConnection.insert(connection);
+                               connection->setSourceProcessor(this);
+                               _outGoingConnections[relationship] = 
existedConnection;
+                               _logger->log_info("Add connection %s into 
Processor %s outgoing connection for relationship %s",
+                                                                               
                connection->getName().c_str(), _name.c_str(), 
relationship.c_str());
+                               ret = true;
+                       }
+               }
+               else
+               {
+                       // We do not have any outgoing connection for this 
relationship yet
+                       std::set<Connection *> newConnection;
+                       newConnection.insert(connection);
+                       connection->setSourceProcessor(this);
+                       _outGoingConnections[relationship] = newConnection;
+                       _logger->log_info("Add connection %s into Processor %s 
outgoing connection for relationship %s",
+                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
+                       ret = true;
+               }
+       }
+
+       return ret;
+}
+
+void Processor::removeConnection(Connection *connection)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not remove connection while the process 
%s is running",
+                               _name.c_str());
+               return;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       uuid_t srcUUID;
+       uuid_t destUUID;
+
+       connection->getSourceProcessorUUID(srcUUID);
+       connection->getDestinationProcessorUUID(destUUID);
+
+       if (uuid_compare(_uuid, destUUID) == 0)
+       {
+               // Connection is destination to the current processor
+               if (_incomingConnections.find(connection) != 
_incomingConnections.end())
+               {
+                       _incomingConnections.erase(connection);
+                       connection->setDestinationProcessor(NULL);
+                       _logger->log_info("Remove connection %s into Processor 
%s incoming connection",
+                                       connection->getName().c_str(), 
_name.c_str());
+                       _incomingConnectionsIter = 
this->_incomingConnections.begin();
+               }
+       }
+
+       if (uuid_compare(_uuid, srcUUID) == 0)
+       {
+               std::string relationship = 
connection->getRelationship().getName();
+               // Connection is source from the current processor
+               std::map<std::string, std::set<Connection *>>::iterator it =
+                               _outGoingConnections.find(relationship);
+               if (it == _outGoingConnections.end())
+               {
+                       return;
+               }
+               else
+               {
+                       if (_outGoingConnections[relationship].find(connection) 
!= _outGoingConnections[relationship].end())
+                       {
+                               
_outGoingConnections[relationship].erase(connection);
+                               connection->setSourceProcessor(NULL);
+                               _logger->log_info("Remove connection %s into 
Processor %s outgoing connection for relationship %s",
+                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
+                       }
+               }
+       }
+}
+
+Connection *Processor::getNextIncomingConnection()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_incomingConnections.size() == 0)
+               return NULL;
+
+       if (_incomingConnectionsIter == _incomingConnections.end())
+               _incomingConnectionsIter = _incomingConnections.begin();
+
+       Connection *ret = *_incomingConnectionsIter;
+       _incomingConnectionsIter++;
+
+       if (_incomingConnectionsIter == _incomingConnections.end())
+               _incomingConnectionsIter = _incomingConnections.begin();
+
+       return ret;
+}
+
+bool Processor::flowFilesQueued()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_incomingConnections.size() == 0)
+               return false;
+
+       for (std::set<Connection *>::iterator it = 
_incomingConnections.begin(); it != _incomingConnections.end(); ++it)
+       {
+               Connection *connection = *it;
+               if (connection->getQueueSize() > 0)
+                       return true;
+       }
+
+       return false;
+}
+
+bool Processor::flowFilesOutGoingFull()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       std::map<std::string, std::set<Connection *>>::iterator it;
+
+       for (it = _outGoingConnections.begin(); it != 
_outGoingConnections.end(); ++it)
+       {
+               // We already has connection for this relationship
+               std::set<Connection *> existedConnection = it->second;
+               for (std::set<Connection *>::iterator itConnection = 
existedConnection.begin(); itConnection != existedConnection.end(); 
++itConnection)
+               {
+                       Connection *connection = *itConnection;
+                       if (connection->isFull())
+                               return true;
+               }
+       }
+
+       return false;
+}
+
+void Processor::onTrigger()
+{
+       ProcessContext *context = new ProcessContext(this);
+       ProcessSession *session = new ProcessSession(context);
+       try {
+               // Call the child onTrigger function
+               this->onTrigger(context, session);
+               session->commit();
+               delete session;
+               delete context;
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               session->rollback();
+               delete session;
+               delete context;
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception Processor::onTrigger");
+               session->rollback();
+               delete session;
+               delete context;
+               throw;
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RealTimeDataCollector.cpp 
b/libminifi/src/RealTimeDataCollector.cpp
new file mode 100644
index 0000000..c7118ff
--- /dev/null
+++ b/libminifi/src/RealTimeDataCollector.cpp
@@ -0,0 +1,482 @@
+/**
+ * @file RealTimeDataCollector.cpp
+ * RealTimeDataCollector class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+
+#include "RealTimeDataCollector.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string 
RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
+Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real 
time processor to process", "data.osp");
+Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", 
"Real Time Server Name", "localhost");
+Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", 
"Real Time Server Port", "10000");
+Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch 
Server Name", "localhost");
+Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch 
Server Port", "10001");
+Property RealTimeDataCollector::ITERATION("Iteration",
+               "If true, sample osp file will be iterated", "true");
+Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real 
Time Message ID", "41");
+Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message 
ID", "172, 30, 48");
+Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real 
Time Data Collection Interval in msec", "10 ms");
+Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch 
Processing Interval in msec", "100 ms");
+Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", 
"Batch Buffer Maximum size in bytes", "262144");
+Relationship RealTimeDataCollector::Success("success", "success operational on 
the flow record");
+
+void RealTimeDataCollector::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(FILENAME);
+       properties.insert(REALTIMESERVERNAME);
+       properties.insert(REALTIMESERVERPORT);
+       properties.insert(BATCHSERVERNAME);
+       properties.insert(BATCHSERVERPORT);
+       properties.insert(ITERATION);
+       properties.insert(REALTIMEMSGID);
+       properties.insert(BATCHMSGID);
+       properties.insert(REALTIMEINTERVAL);
+       properties.insert(BATCHINTERVAL);
+       properties.insert(BATCHMAXBUFFERSIZE);
+
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+
+}
+
+int RealTimeDataCollector::connectServer(const char *host, uint16_t port)
+{
+       in_addr_t addr;
+       int sock = 0;
+       struct hostent *h;
+#ifdef __MACH__
+       h = gethostbyname(host);
+#else
+       char buf[1024];
+       struct hostent he;
+       int hh_errno;
+       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+       sock = socket(AF_INET, SOCK_STREAM, 0);
+       if (sock < 0)
+       {
+               _logger->log_error("Could not create socket to hostName %s", 
host);
+               return 0;
+       }
+
+#ifndef __MACH__
+       int opt = 1;
+       bool nagle_off = true;
+
+       if (nagle_off)
+       {
+               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() TCP_NODELAY failed");
+                       close(sock);
+                       return 0;
+               }
+               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&opt, sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
+                       close(sock);
+                       return 0;
+               }
+       }
+
+       int sndsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_SNDBUF failed");
+               close(sock);
+               return 0;
+       }
+#endif
+
+       struct sockaddr_in sa;
+       socklen_t socklen;
+       int status;
+
+       //TODO bind socket to the interface
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = htonl(INADDR_ANY);
+       sa.sin_port = htons(0);
+       socklen = sizeof(sa);
+       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+       {
+               _logger->log_error("socket bind failed");
+               close(sock);
+               return 0;
+       }
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = addr;
+       sa.sin_port = htons(port);
+       socklen = sizeof(sa);
+
+       status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+       if (status < 0)
+       {
+               _logger->log_error("socket connect failed to %s %d", host, 
port);
+               close(sock);
+               return 0;
+       }
+
+       _logger->log_info("socket %d connect to server %s port %d success", 
sock, host, port);
+
+       return sock;
+}
+
+int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen)
+{
+       int ret = 0, bytes = 0;
+
+       while (bytes < buflen)
+       {
+               ret = send(socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       if (ret)
+               _logger->log_debug("Send data size %d over socket %d", buflen, 
socket);
+
+       return ret;
+}
+
+void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, 
ProcessSession *session)
+{
+       if (_realTimeAccumulated >= this->_realTimeInterval)
+       {
+               std::string value;
+               if (this->getProperty(REALTIMEMSGID.getName(), value))
+               {
+                       this->_realTimeMsgID.clear();
+                       this->_logger->log_info("Real Time Msg IDs %s", 
value.c_str());
+                       std::stringstream lineStream(value);
+                       std::string cell;
+
+                       while(std::getline(lineStream, cell, ','))
+                   {
+                       this->_realTimeMsgID.push_back(cell);
+                       // this->_logger->log_debug("Real Time Msg ID %s", 
cell.c_str());
+                   }
+               }
+               if (this->getProperty(BATCHMSGID.getName(), value))
+               {
+                       this->_batchMsgID.clear();
+                       this->_logger->log_info("Batch Msg IDs %s", 
value.c_str());
+                       std::stringstream lineStream(value);
+                       std::string cell;
+
+                       while(std::getline(lineStream, cell, ','))
+                   {
+                               cell = Property::trim(cell);
+                       this->_batchMsgID.push_back(cell);
+                       // this->_logger->log_debug("Batch Msg ID %s", 
cell.c_str());
+                   }
+               }
+               // _logger->log_info("onTriggerRealTime");
+               // Open the file
+               if (!this->_fileStream.is_open())
+               {
+                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
+                       if (this->_fileStream.is_open())
+                               _logger->log_debug("open %s", 
_fileName.c_str());
+               }
+               if (!_fileStream.good())
+               {
+                       _logger->log_error("load data file failed %s", 
_fileName.c_str());
+                       return;
+               }
+               if (this->_fileStream.is_open())
+               {
+                       std::string line;
+
+                       while (std::getline(_fileStream, line))
+                       {
+                               line += "\n";
+                               std::stringstream lineStream(line);
+                               std::string cell;
+                               if (std::getline(lineStream, cell, ','))
+                               {
+                                       cell = Property::trim(cell);
+                                       // Check whether it match to the batch 
traffic
+                                       for (std::vector<std::string>::iterator 
it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it)
+                                       {
+                                               if (cell == *it)
+                                               {
+                                                       // push the batch data 
to the queue
+                                                       
std::lock_guard<std::mutex> lock(_mtx);
+                                                       while ((_queuedDataSize 
+ line.size()) > _batchMaxBufferSize)
+                                                       {
+                                                               std::string 
item = _queue.front();
+                                                               _queuedDataSize 
-= item.size();
+                                                               
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
+                                                               _queue.pop();
+                                                       }
+                                                       _queue.push(line);
+                                                       _queuedDataSize += 
line.size();
+                                                       
_logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size 
%d", cell.c_str(), _queuedDataSize);
+                                               }
+                                       }
+                                       bool findRealTime = false;
+                                       // Check whether it match to the real 
time traffic
+                                       for (std::vector<std::string>::iterator 
it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it)
+                                       {
+                                               if (cell == *it)
+                                               {
+                                                       int status = 0;
+                                                       if 
(this->_realTimeSocket <= 0)
+                                                       {
+                                                               // Connect the 
LTE socket
+                                                               uint16_t port = 
_realTimeServerPort;
+                                                               
this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
+                                                       }
+                                                       if 
(this->_realTimeSocket)
+                                                       {
+                                                               // try to send 
the data
+                                                               status = 
sendData(_realTimeSocket, line.data(), line.size());
+                                                               if (status < 0)
+                                                               {
+                                                                       
close(_realTimeSocket);
+                                                                       
_realTimeSocket = 0;
+                                                               }
+                                                       }
+                                                       if 
(this->_realTimeSocket <= 0 || status < 0)
+                                                       {
+                                                               // push the 
batch data to the queue
+                                                               
std::lock_guard<std::mutex> lock(_mtx);
+                                                               while 
((_queuedDataSize + line.size()) > _batchMaxBufferSize)
+                                                               {
+                                                                       
std::string item = _queue.front();
+                                                                       
_queuedDataSize -= item.size();
+                                                                       
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
+                                                                       
_queue.pop();
+                                                               }
+                                                               
_queue.push(line);
+                                                               _queuedDataSize 
+= line.size();
+                                                               
_logger->log_debug("Push real time msg ID %s into batch queue, queue buffer 
size %d", cell.c_str(), _queuedDataSize);
+                                                       }
+                                                       // find real time
+                                                       findRealTime = true;
+                                               } // cell
+                                       } // for real time pattern
+                                       if (findRealTime)
+                                               // we break the while once we 
find the first real time
+                                               break;
+                               }  // if get line
+                       } // while
+                       if (_fileStream.eof())
+                       {
+                               _fileStream.close();
+                       }
+               } // if open
+               _realTimeAccumulated = 0;
+       }
+       _realTimeAccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, 
ProcessSession *session)
+{
+       if (_batchAcccumulated >= this->_batchInterval)
+       {
+               // _logger->log_info("onTriggerBatch");
+               // dequeue the batch and send over WIFI
+               int status = 0;
+               if (this->_batchSocket <= 0)
+               {
+                       // Connect the WIFI socket
+                       uint16_t port = _batchServerPort;
+                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
+               }
+               if (this->_batchSocket)
+               {
+                       std::lock_guard<std::mutex> lock(_mtx);
+
+                       while (!_queue.empty())
+                       {
+                               std::string line = _queue.front();
+                               status = sendData(_batchSocket, line.data(), 
line.size());
+                               _queue.pop();
+                               _queuedDataSize -= line.size();
+                               if (status < 0)
+                               {
+                                       close(_batchSocket);
+                                       _batchSocket = 0;
+                                       break;
+                               }
+                       }
+               }
+               _batchAcccumulated = 0;
+       }
+       _batchAcccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession 
*session)
+{
+       std::thread::id id = std::this_thread::get_id();
+
+       if (id == _realTimeThreadId)
+               return onTriggerRealTime(context, session);
+       else if (id == _batchThreadId)
+               return onTriggerBatch(context, session);
+       else
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               if (!this->_firstInvoking)
+               {
+                       this->_fileName = "data.osp";
+                       std::string value;
+                       if (this->getProperty(FILENAME.getName(), value))
+                       {
+                               this->_fileName = value;
+                               this->_logger->log_info("Data Collector File 
Name %s", _fileName.c_str());
+                       }
+                       this->_realTimeServerName = "localhost";
+                       if (this->getProperty(REALTIMESERVERNAME.getName(), 
value))
+                       {
+                               this->_realTimeServerName = value;
+                               this->_logger->log_info("Real Time Server Name 
%s", this->_realTimeServerName.c_str());
+                       }
+                       this->_realTimeServerPort = 10000;
+                       if (this->getProperty(REALTIMESERVERPORT.getName(), 
value))
+                       {
+                               Property::StringToInt(value, 
_realTimeServerPort);
+                               this->_logger->log_info("Real Time Server Port 
%d", _realTimeServerPort);
+                       }
+                       if (this->getProperty(BATCHSERVERNAME.getName(), value))
+                       {
+                               this->_batchServerName = value;
+                               this->_logger->log_info("Batch Server Name %s", 
this->_batchServerName.c_str());
+                       }
+                       this->_batchServerPort = 10001;
+                       if (this->getProperty(BATCHSERVERPORT.getName(), value))
+                       {
+                               Property::StringToInt(value, _batchServerPort);
+                               this->_logger->log_info("Batch Server Port %d", 
_batchServerPort);
+                       }
+                       if (this->getProperty(ITERATION.getName(), value))
+                       {
+                               Property::StringToBool(value, this->_iteration);
+                               _logger->log_info("Iteration %d", _iteration);
+                       }
+                       this->_realTimeInterval = 10000000; //10 msec
+                       if (this->getProperty(REALTIMEINTERVAL.getName(), 
value))
+                       {
+                               TimeUnit unit;
+                               if (Property::StringToTime(value, 
_realTimeInterval, unit) &&
+                                                               
Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval))
+                               {
+                                       _logger->log_info("Real Time Interval: 
[%d] ns", _realTimeInterval);
+                               }
+                       }
+                       this->_batchInterval = 100000000; //100 msec
+                       if (this->getProperty(BATCHINTERVAL.getName(), value))
+                       {
+                               TimeUnit unit;
+                               if (Property::StringToTime(value, 
_batchInterval, unit) &&
+                                                               
Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval))
+                               {
+                                       _logger->log_info("Batch Time Interval: 
[%d] ns", _batchInterval);
+                               }
+                       }
+                       this->_batchMaxBufferSize = 256*1024;
+                       if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), 
value))
+                       {
+                               Property::StringToInt(value, 
_batchMaxBufferSize);
+                               this->_logger->log_info("Batch Max Buffer Size 
%d", _batchMaxBufferSize);
+                       }
+                       if (this->getProperty(REALTIMEMSGID.getName(), value))
+                       {
+                               this->_logger->log_info("Real Time Msg IDs %s", 
value.c_str());
+                               std::stringstream lineStream(value);
+                               std::string cell;
+
+                               while(std::getline(lineStream, cell, ','))
+                           {
+                               this->_realTimeMsgID.push_back(cell);
+                               this->_logger->log_info("Real Time Msg ID %s", 
cell.c_str());
+                           }
+                       }
+                       if (this->getProperty(BATCHMSGID.getName(), value))
+                       {
+                               this->_logger->log_info("Batch Msg IDs %s", 
value.c_str());
+                               std::stringstream lineStream(value);
+                               std::string cell;
+
+                               while(std::getline(lineStream, cell, ','))
+                           {
+                                       cell = Property::trim(cell);
+                               this->_batchMsgID.push_back(cell);
+                               this->_logger->log_info("Batch Msg ID %s", 
cell.c_str());
+                           }
+                       }
+                       // Connect the LTE socket
+                       uint16_t port = _realTimeServerPort;
+
+                       this->_realTimeSocket = 
connectServer(_realTimeServerName.c_str(), port);
+
+                       // Connect the WIFI socket
+                       port = _batchServerPort;
+
+                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
+
+                       // Open the file
+                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
+                       if (!_fileStream.good())
+                       {
+                               _logger->log_error("load data file failed %s", 
_fileName.c_str());
+                               return;
+                       }
+                       else
+                       {
+                               _logger->log_debug("open %s", 
_fileName.c_str());
+                       }
+                       _realTimeThreadId = id;
+                       this->_firstInvoking = true;
+               }
+               else
+               {
+                       if (id != _realTimeThreadId)
+                               _batchThreadId = id;
+                       this->_firstInvoking = false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
new file mode 100644
index 0000000..9d849ae
--- /dev/null
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -0,0 +1,100 @@
+/**
+ * @file RemoteProcessorGroupPort.cpp
+ * RemoteProcessorGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "TimeUtil.h"
+#include "RemoteProcessorGroupPort.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string 
RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
+Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", 
"localhost");
+Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+Relationship RemoteProcessorGroupPort::relation;
+
+void RemoteProcessorGroupPort::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(hostName);
+       properties.insert(port);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(relation);
+       setSupportedRelationships(relationships);
+}
+
+void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, 
ProcessSession *session)
+{
+       std::string value;
+
+       if (!_transmitting)
+               return;
+
+       std::string host = _peer->getHostName();
+       uint16_t sport = _peer->getPort();
+       int64_t lvalue;
+       bool needReset = false;
+
+       if (context->getProperty(hostName.getName(), value))
+       {
+               host = value;
+       }
+       if (context->getProperty(port.getName(), value) && 
Property::StringToInt(value, lvalue))
+       {
+               sport = (uint16_t) lvalue;
+       }
+       if (host != _peer->getHostName())
+       {
+               _peer->setHostName(host);
+               needReset= true;
+       }
+       if (sport != _peer->getPort())
+       {
+               _peer->setPort(sport);
+               needReset = true;
+       }
+       if (needReset)
+               _protocol->tearDown();
+
+       if (!_protocol->bootstrap())
+       {
+               // bootstrap the client protocol if needeed
+               context->yield();
+               _logger->log_error("Site2Site bootstrap failed yield period %d 
peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), 
_protocol->getPeer()->getTimeOut());
+               return;
+       }
+
+       if (_direction == RECEIVE)
+               _protocol->receiveFlowFiles(context, session);
+       else
+               _protocol->transferFlowFiles(context, session);
+
+       return;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
new file mode 100644
index 0000000..3c22ac9
--- /dev/null
+++ b/libminifi/src/ResourceClaim.cpp
@@ -0,0 +1,45 @@
+/**
+ * @file ResourceClaim.cpp
+ * ResourceClaim class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "ResourceClaim.h"
+
+std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
+
+ResourceClaim::ResourceClaim(const std::string contentDirectory)
+: _id(_localResourceClaimNumber.load()),
+  _flowFileRecordOwnedCount(0)
+{
+       char uuidStr[37];
+
+       // Generate the global UUID for the resource claim
+       uuid_generate(_uuid);
+       // Increase the local ID for the resource claim
+       ++_localResourceClaimNumber;
+       uuid_unparse(_uuid, uuidStr);
+       // Create the full content path for the content
+       _contentFullPath = contentDirectory + "/" + uuidStr;
+
+       _configure = Configure::getConfigure();
+       _logger = Logger::getLogger();
+       _logger->log_debug("Resource Claim created %s", 
_contentFullPath.c_str());
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp 
b/libminifi/src/SchedulingAgent.cpp
new file mode 100644
index 0000000..211c328
--- /dev/null
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -0,0 +1,86 @@
+/**
+ * @file SchedulingAgent.cpp
+ * SchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <iostream>
+#include "Exception.h"
+#include "SchedulingAgent.h"
+
+bool SchedulingAgent::hasWorkToDo(Processor *processor)
+{
+       // Whether it has work to do
+       if (processor->getTriggerWhenEmpty() || 
!processor->hasIncomingConnections() ||
+                       processor->flowFilesQueued())
+               return true;
+       else
+               return false;
+}
+
+bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor)
+{
+       return processor->flowFilesOutGoingFull();
+}
+
+bool SchedulingAgent::onTrigger(Processor *processor)
+{
+       if (processor->isYield())
+               return false;
+
+       // No need to yield, reset yield expiration to 0
+       processor->clearYield();
+
+       if (!hasWorkToDo(processor))
+               // No work to do, yield
+               return true;
+
+       if(hasTooMuchOutGoing(processor))
+               // need to apply backpressure
+               return true;
+
+       //TODO runDuration
+
+       processor->incrementActiveTasks();
+       try
+       {
+               processor->onTrigger();
+               processor->decrementActiveTask();
+       }
+       catch (Exception &exception)
+       {
+               // Normal exception
+               _logger->log_debug("Caught Exception %s", exception.what());
+               processor->decrementActiveTask();
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               processor->yield(_administrativeYieldDuration);
+               processor->decrementActiveTask();
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during 
SchedulingAgent::onTrigger");
+               processor->yield(_administrativeYieldDuration);
+               processor->decrementActiveTask();
+       }
+
+       return false;
+}
+

Reply via email to