http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
new file mode 100644
index 0000000..e036b89
--- /dev/null
+++ b/libminifi/src/Connection.cpp
@@ -0,0 +1,160 @@
+/**
+ * @file Connection.cpp
+ * Connection class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "Connection.h"
+
+Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t 
destUUID)
+: _name(name)
+{
+       if (!uuid)
+               // Generate the global UUID for the flow record
+               uuid_generate(_uuid);
+       else
+               uuid_copy(_uuid, uuid);
+
+       if (srcUUID)
+               uuid_copy(_srcUUID, srcUUID);
+       if (destUUID)
+               uuid_copy(_destUUID, destUUID);
+
+       _srcProcessor = NULL;
+       _destProcessor = NULL;
+       _maxQueueSize = 0;
+       _maxQueueDataSize = 0;
+       _expiredDuration = 0;
+       _queuedDataSize = 0;
+
+       _logger = Logger::getLogger();
+
+       _logger->log_info("Connection %s created", _name.c_str());
+}
+
+bool Connection::isEmpty()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       return _queue.empty();
+}
+
+bool Connection::isFull()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0)
+               // No back pressure setting
+               return false;
+
+       if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize)
+               return true;
+
+       if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize)
+               return true;
+
+       return false;
+}
+
+void Connection::put(FlowFileRecord *flow)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _queue.push(flow);
+
+       _queuedDataSize += flow->getSize();
+
+       _logger->log_debug("Enqueue flow file UUID %s to connection %s",
+                       flow->getUUIDStr().c_str(), _name.c_str());
+}
+
+FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> 
&expiredFlowRecords)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       while (!_queue.empty())
+       {
+               FlowFileRecord *item = _queue.front();
+               _queue.pop();
+               _queuedDataSize -= item->getSize();
+
+               if (_expiredDuration > 0)
+               {
+                       // We need to check for flow expiration
+                       if (getTimeMillis() > (item->getEntryDate() + 
_expiredDuration))
+                       {
+                               // Flow record expired
+                               expiredFlowRecords.insert(item);
+                       }
+                       else
+                       {
+                               // Flow record not expired
+                               if (item->isPenalized())
+                               {
+                                       // Flow record was penalized
+                                       _queue.push(item);
+                                       _queuedDataSize += item->getSize();
+                                       break;
+                               }
+                               item->setOriginalConnection(this);
+                               _logger->log_debug("Dequeue flow file UUID %s 
from connection %s",
+                                               item->getUUIDStr().c_str(), 
_name.c_str());
+                               return item;
+                       }
+               }
+               else
+               {
+                       // Flow record not expired
+                       if (item->isPenalized())
+                       {
+                               // Flow record was penalized
+                               _queue.push(item);
+                               _queuedDataSize += item->getSize();
+                               break;
+                       }
+                       item->setOriginalConnection(this);
+                       _logger->log_debug("Dequeue flow file UUID %s from 
connection %s",
+                                       item->getUUIDStr().c_str(), 
_name.c_str());
+                       return item;
+               }
+       }
+
+       return NULL;
+}
+
+void Connection::drain()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       while (!_queue.empty())
+       {
+               FlowFileRecord *item = _queue.front();
+               _queue.pop();
+               delete item;
+       }
+
+       _logger->log_debug("Drain connection %s", _name.c_str());
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp 
b/libminifi/src/FlowControlProtocol.cpp
new file mode 100644
index 0000000..011ebcf
--- /dev/null
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -0,0 +1,541 @@
+/**
+ * @file FlowControlProtocol.cpp
+ * FlowControlProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "FlowController.h"
+#include "FlowControlProtocol.h"
+
+int FlowControlProtocol::connectServer(const char *host, uint16_t port)
+{
+       in_addr_t addr;
+       int sock = 0;
+       struct hostent *h;
+#ifdef __MACH__
+       h = gethostbyname(host);
+#else
+       char buf[1024];
+       struct hostent he;
+       int hh_errno;
+       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+       sock = socket(AF_INET, SOCK_STREAM, 0);
+       if (sock < 0)
+       {
+               _logger->log_error("Could not create socket to hostName %s", 
host);
+               return 0;
+       }
+
+#ifndef __MACH__
+       int opt = 1;
+       bool nagle_off = true;
+
+       if (nagle_off)
+       {
+               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() TCP_NODELAY failed");
+                       close(sock);
+                       return 0;
+               }
+               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&opt, sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
+                       close(sock);
+                       return 0;
+               }
+       }
+
+       int sndsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_SNDBUF failed");
+               close(sock);
+               return 0;
+       }
+#endif
+
+       struct sockaddr_in sa;
+       socklen_t socklen;
+       int status;
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = htonl(INADDR_ANY);
+       sa.sin_port = htons(0);
+       socklen = sizeof(sa);
+       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+       {
+               _logger->log_error("socket bind failed");
+               close(sock);
+               return 0;
+       }
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = addr;
+       sa.sin_port = htons(port);
+       socklen = sizeof(sa);
+
+       status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+       if (status < 0)
+       {
+               _logger->log_error("socket connect failed to %s %d", host, 
port);
+               close(sock);
+               return 0;
+       }
+
+       _logger->log_info("Flow Control Protocol socket %d connect to server %s 
port %d success", sock, host, port);
+
+       return sock;
+}
+
+int FlowControlProtocol::sendData(uint8_t *buf, int buflen)
+{
+       int ret = 0, bytes = 0;
+
+       while (bytes < buflen)
+       {
+               ret = send(_socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       return bytes;
+}
+
+int FlowControlProtocol::selectClient(int msec)
+{
+       fd_set fds;
+       struct timeval tv;
+    int retval;
+    int fd = _socket;
+
+    FD_ZERO(&fds);
+    FD_SET(fd, &fds);
+
+    tv.tv_sec = msec/1000;
+    tv.tv_usec = (msec % 1000) * 1000;
+
+    if (msec > 0)
+       retval = select(fd+1, &fds, NULL, NULL, &tv);
+    else
+       retval = select(fd+1, &fds, NULL, NULL, NULL);
+
+    if (retval <= 0)
+      return retval;
+    if (FD_ISSET(fd, &fds))
+      return retval;
+    else
+      return 0;
+}
+
+int FlowControlProtocol::readData(uint8_t *buf, int buflen)
+{
+       int sendSize = buflen;
+
+       while (buflen)
+       {
+               int status;
+               status = selectClient(MAX_READ_TIMEOUT);
+               if (status <= 0)
+               {
+                       return status;
+               }
+#ifndef __MACH__
+               status = read(_socket, buf, buflen);
+#else
+               status = recv(_socket, buf, buflen, 0);
+#endif
+               if (status <= 0)
+               {
+                       return status;
+               }
+               buflen -= status;
+               buf += status;
+       }
+
+       return sendSize;
+}
+
+int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr)
+{
+       uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+       uint8_t *data = buffer;
+
+       int status = readData(buffer, sizeof(FlowControlProtocolHeader));
+       if (status <= 0)
+               return status;
+
+       uint32_t value;
+       data = this->decode(data, value);
+       hdr->msgType = value;
+
+       data = this->decode(data, value);
+       hdr->seqNumber = value;
+
+       data = this->decode(data, value);
+       hdr->status = value;
+
+       data = this->decode(data, value);
+       hdr->payloadLen = value;
+
+       return sizeof(FlowControlProtocolHeader);
+}
+
+void FlowControlProtocol::start()
+{
+       if (_reportInterval <= 0)
+               return;
+       if (_running)
+               return;
+       _running = true;
+       _logger->log_info("FlowControl Protocol Start");
+       _thread = new std::thread(run, this);
+       _thread->detach();
+}
+
+void FlowControlProtocol::stop()
+{
+       if (!_running)
+               return;
+       _running = false;
+       _logger->log_info("FlowControl Protocol Stop");
+}
+
+void FlowControlProtocol::run(FlowControlProtocol *protocol)
+{
+       while (protocol->_running)
+       {
+               
std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
+               if (!protocol->_registered)
+               {
+                       // if it is not register yet
+                       protocol->sendRegisterReq();
+                       // protocol->_controller->reload("flow.xml");
+               }
+               else
+                       protocol->sendReportReq();
+       }
+       return;
+}
+
+int FlowControlProtocol::sendRegisterReq()
+{
+       if (_registered)
+       {
+               _logger->log_info("Already registered");
+               return -1;
+       }
+
+       uint16_t port = this->_serverPort;
+
+       if (this->_socket <= 0)
+               this->_socket = connectServer(_serverName.c_str(), port);
+
+       if (this->_socket <= 0)
+               return -1;
+
+       // Calculate the total payload msg size
+       uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 
0) +
+                       FlowControlMsgIDEncodingLen(FLOW_XML_NAME, 
this->_controller->getName().size()+1);
+       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+       uint8_t *data = new uint8_t[size];
+       uint8_t *start = data;
+
+       // encode the HDR
+       FlowControlProtocolHeader hdr;
+       hdr.msgType = REGISTER_REQ;
+       hdr.payloadLen = payloadSize;
+       hdr.seqNumber  = this->_seqNumber;
+       hdr.status = RESP_SUCCESS;
+       data = this->encode(data, hdr.msgType);
+       data = this->encode(data, hdr.seqNumber);
+       data = this->encode(data, hdr.status);
+       data = this->encode(data, hdr.payloadLen);
+
+       // encode the serial number
+       data = this->encode(data, FLOW_SERIAL_NUMBER);
+       data = this->encode(data, this->_serialNumber, 8);
+
+       // encode the XML name
+       data = this->encode(data, FLOW_XML_NAME);
+       data = this->encode(data, this->_controller->getName());
+
+       // send it
+       int status = sendData(start, size);
+       delete[] start;
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Send Register Req 
failed");
+               return -1;
+       }
+
+       // Looking for register respond
+       status = readHdr(&hdr);
+
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Read Register Resp 
header failed");
+               return -1;
+       }
+       _logger->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+       _logger->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
+       _logger->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+       _logger->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
+
+       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+       {
+               this->_registered = true;
+               this->_seqNumber++;
+               _logger->log_info("Flow Control Protocol Register success");
+               uint8_t *payload = new uint8_t[hdr.payloadLen];
+               uint8_t *payloadPtr = payload;
+               status = readData(payload, hdr.payloadLen);
+               if (status <= 0)
+               {
+                       delete[] payload;
+                       _logger->log_info("Flow Control Protocol Register Read 
Payload fail");
+                       close(_socket);
+                       _socket = 0;
+                       return -1;
+               }
+               while (payloadPtr < (payload + hdr.payloadLen))
+               {
+                       uint32_t msgID;
+                       payloadPtr = this->decode(payloadPtr, msgID);
+                       if (((FlowControlMsgID) msgID) == REPORT_INTERVAL)
+                       {
+                               // Fixed 4 bytes
+                               uint32_t reportInterval;
+                               payloadPtr = this->decode(payloadPtr, 
reportInterval);
+                               _logger->log_info("Flow Control Protocol 
receive report interval %d ms", reportInterval);
+                               this->_reportInterval = reportInterval;
+                       }
+                       else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT)
+                       {
+                               uint32_t xmlLen;
+                               payloadPtr = this->decode(payloadPtr, xmlLen);
+                               _logger->log_info("Flow Control Protocol 
receive XML content length %d", xmlLen);
+                               time_t rawtime;
+                               struct tm *timeinfo;
+                               time(&rawtime);
+                               timeinfo = localtime(&rawtime);
+                               std::string xmlFileName = "flow.";
+                               xmlFileName += asctime(timeinfo);
+                               xmlFileName += ".xml";
+                               std::ofstream fs;
+                               fs.open(xmlFileName.c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
+                               if (fs.is_open())
+                               {
+                                       fs.write((const char *)payloadPtr, 
xmlLen);
+                                       fs.close();
+                                       
this->_controller->reload(xmlFileName.c_str());
+                               }
+                       }
+                       else
+                       {
+                               break;
+                       }
+               }
+               delete[] payload;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else
+       {
+               _logger->log_info("Flow Control Protocol Register fail");
+               close(_socket);
+               _socket = 0;
+               return -1;
+       }
+}
+
+
+int FlowControlProtocol::sendReportReq()
+{
+       uint16_t port = this->_serverPort;
+
+       if (this->_socket <= 0)
+               this->_socket = connectServer(_serverName.c_str(), port);
+
+       if (this->_socket <= 0)
+               return -1;
+
+       // Calculate the total payload msg size
+       uint32_t payloadSize =
+                       FlowControlMsgIDEncodingLen(FLOW_XML_NAME, 
this->_controller->getName().size()+1);
+       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+       uint8_t *data = new uint8_t[size];
+       uint8_t *start = data;
+
+       // encode the HDR
+       FlowControlProtocolHeader hdr;
+       hdr.msgType = REPORT_REQ;
+       hdr.payloadLen = payloadSize;
+       hdr.seqNumber  = this->_seqNumber;
+       hdr.status = RESP_SUCCESS;
+       data = this->encode(data, hdr.msgType);
+       data = this->encode(data, hdr.seqNumber);
+       data = this->encode(data, hdr.status);
+       data = this->encode(data, hdr.payloadLen);
+
+       // encode the XML name
+       data = this->encode(data, FLOW_XML_NAME);
+       data = this->encode(data, this->_controller->getName());
+
+       // send it
+       int status = sendData(start, size);
+       delete[] start;
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Send Report Req 
failed");
+               return -1;
+       }
+
+       // Looking for report respond
+       status = readHdr(&hdr);
+
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Read Report Resp 
header failed");
+               return -1;
+       }
+       _logger->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+       _logger->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
+       _logger->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+       _logger->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
+
+       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+       {
+               this->_seqNumber++;
+               uint8_t *payload = new uint8_t[hdr.payloadLen];
+               uint8_t *payloadPtr = payload;
+               status = readData(payload, hdr.payloadLen);
+               if (status <= 0)
+               {
+                       delete[] payload;
+                       _logger->log_info("Flow Control Protocol Report Resp 
Read Payload fail");
+                       close(_socket);
+                       _socket = 0;
+                       return -1;
+               }
+               std::string processor;
+               std::string propertyName;
+               std::string propertyValue;
+               while (payloadPtr < (payload + hdr.payloadLen))
+               {
+                       uint32_t msgID;
+                       payloadPtr = this->decode(payloadPtr, msgID);
+                       if (((FlowControlMsgID) msgID) == PROCESSOR_NAME)
+                       {
+                               uint32_t len;
+                               payloadPtr = this->decode(payloadPtr, len);
+                               processor = (const char *) payloadPtr;
+                               payloadPtr += len;
+                               _logger->log_info("Flow Control Protocol 
receive report resp processor %s", processor.c_str());
+                       }
+                       else if (((FlowControlMsgID) msgID) == PROPERTY_NAME)
+                       {
+                               uint32_t len;
+                               payloadPtr = this->decode(payloadPtr, len);
+                               propertyName = (const char *) payloadPtr;
+                               payloadPtr += len;
+                               _logger->log_info("Flow Control Protocol 
receive report resp property name %s", propertyName.c_str());
+                       }
+                       else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE)
+                       {
+                               uint32_t len;
+                               payloadPtr = this->decode(payloadPtr, len);
+                               propertyValue = (const char *) payloadPtr;
+                               payloadPtr += len;
+                               _logger->log_info("Flow Control Protocol 
receive report resp property value %s", propertyValue.c_str());
+                               
this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
+                       }
+                       else
+                       {
+                               break;
+                       }
+               }
+               delete[] payload;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == 
this->_seqNumber)
+       {
+               _logger->log_info("Flow Control Protocol trigger reregister");
+               this->_registered = false;
+               this->_seqNumber++;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
+       {
+               _logger->log_info("Flow Control Protocol stop flow controller");
+               this->_controller->stop(true);
+               this->_seqNumber++;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
+       {
+               _logger->log_info("Flow Control Protocol start flow 
controller");
+               this->_controller->start();
+               this->_seqNumber++;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else
+       {
+               _logger->log_info("Flow Control Protocol Report fail");
+               close(_socket);
+               _socket = 0;
+               return -1;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
new file mode 100644
index 0000000..8fbe3dc
--- /dev/null
+++ b/libminifi/src/FlowController.cpp
@@ -0,0 +1,1190 @@
+/**
+ * @file FlowController.cpp
+ * FlowController class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <libxml/parser.h>
+#include <libxml/tree.h>
+
+#include "FlowController.h"
+#include "ProcessContext.h"
+
+FlowController::FlowController(std::string name)
+: _name(name)
+{
+       uuid_generate(_uuid);
+
+       // Setup the default values
+       _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
+       _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
+       _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
+       _running = false;
+       _initialized = false;
+       _root = NULL;
+       _logger = Logger::getLogger();
+       _protocol = new FlowControlProtocol(this);
+
+       // NiFi config properties
+       _configure = Configure::getConfigure();
+
+       std::string rawConfigFileString;
+       _configure->get(Configure::nifi_flow_configuration_file, 
rawConfigFileString);
+
+       if (!rawConfigFileString.empty())
+       {
+               _configurationFileName = rawConfigFileString;
+       }
+
+       char *path = NULL;
+       char full_path[PATH_MAX];
+
+       std::string adjustedFilename;
+       if (!_configurationFileName.empty())
+       {
+               // perform a naive determination if this is a relative path
+               if (_configurationFileName.c_str()[0] != '/')
+               {
+                       adjustedFilename = adjustedFilename + 
_configure->getHome() + "/" + _configurationFileName;
+               }
+               else
+               {
+                       adjustedFilename = _configurationFileName;
+               }
+       }
+
+       path = realpath(adjustedFilename.c_str(), full_path);
+       if (!path)
+       {
+               _logger->log_error("Could not locate path from provided 
configuration file name.");
+       }
+
+       std::string pathString(path);
+       _configurationFileName = pathString;
+       _logger->log_info("FlowController NiFi Configuration file %s", 
pathString.c_str());
+
+       // Create repos for flow record and provenance
+
+       _logger->log_info("FlowController %s created", _name.c_str());
+}
+
+FlowController::~FlowController()
+{
+       stop(true);
+       unload();
+       delete _protocol;
+}
+
+bool FlowController::isRunning()
+{
+       return (_running);
+}
+
+bool FlowController::isInitialized()
+{
+       return (_initialized);
+}
+
+void FlowController::stop(bool force)
+{
+       if (_running)
+       {
+               _logger->log_info("Stop Flow Controller");
+               this->_timerScheduler.stop();
+               // Wait for sometime for thread stop
+               std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+               if (this->_root)
+                       this->_root->stopProcessing(&this->_timerScheduler);
+               _running = false;
+       }
+}
+
+void FlowController::unload()
+{
+       if (_running)
+       {
+               stop(true);
+       }
+       if (_initialized)
+       {
+               _logger->log_info("Unload Flow Controller");
+               if (_root)
+                       delete _root;
+               _root = NULL;
+               _initialized = false;
+               _name = "";
+       }
+
+       return;
+}
+
+void FlowController::reload(std::string xmlFile)
+{
+       _logger->log_info("Starting to reload Flow Controller with xml %s", 
xmlFile.c_str());
+       stop(true);
+       unload();
+       std::string oldxmlFile = this->_configurationFileName;
+       this->_configurationFileName = xmlFile;
+       load(ConfigFormat::XML);
+       start();
+       if (!this->_root)
+       {
+               this->_configurationFileName = oldxmlFile;
+               _logger->log_info("Rollback Flow Controller to xml %s", 
oldxmlFile.c_str());
+               stop(true);
+               unload();
+               load(ConfigFormat::XML);
+               start();
+       }
+}
+
+Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
+{
+       Processor *processor = NULL;
+       if (name == GenerateFlowFile::ProcessorName)
+       {
+               processor = new GenerateFlowFile(name, uuid);
+       }
+       else if (name == LogAttribute::ProcessorName)
+       {
+               processor = new LogAttribute(name, uuid);
+       }
+       else if (name == RealTimeDataCollector::ProcessorName)
+       {
+               processor = new RealTimeDataCollector(name, uuid);
+       }
+       else if (name == GetFile::ProcessorName)
+       {
+               processor = new GetFile(name, uuid);
+       }
+       else if (name == TailFile::ProcessorName)
+       {
+               processor = new TailFile(name, uuid);
+       }
+       else if (name == ListenSyslog::ProcessorName)
+       {
+               processor = new ListenSyslog(name, uuid);
+       }
+       else if (name == ExecuteProcess::ProcessorName)
+       {
+               processor = new ExecuteProcess(name, uuid);
+       }
+       else
+       {
+               _logger->log_error("No Processor defined for %s", name.c_str());
+               return NULL;
+       }
+
+       //! initialize the processor
+       processor->initialize();
+
+       return processor;
+}
+
+ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t 
uuid)
+{
+       return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
+}
+
+ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, 
uuid_t uuid)
+{
+       return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
+}
+
+Connection *FlowController::createConnection(std::string name, uuid_t uuid)
+{
+       return new Connection(name, uuid);
+}
+
+void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup 
*parent)
+{
+       uuid_t uuid;
+       xmlNode *currentNode;
+       Connection *connection = NULL;
+
+       if (!parent)
+       {
+               _logger->log_error("parseProcessNode: no parent group existed");
+               return;
+       }
+
+       // generate the random UIID
+       uuid_generate(uuid);
+
+       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next)
+       {
+               if (currentNode->type == XML_ELEMENT_NODE)
+               {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0)
+                       {
+                               char *id = (char *) 
xmlNodeGetContent(currentNode);
+                               if (id) {
+                                       _logger->log_debug("parseConnection: id 
=> [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
+                               char *name = (char *) 
xmlNodeGetContent(currentNode);
+                               if (name) {
+                                       _logger->log_debug("parseConnection: 
name => [%s]", name);
+                                       connection = 
this->createConnection(name, uuid);
+                                       if (connection == NULL) {
+                                               xmlFree(name);
+                                               return;
+                                       }
+                                       xmlFree(name);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"sourceId") == 0) {
+                               char *id = (char *) 
xmlNodeGetContent(currentNode);
+                               if (id) {
+                                       _logger->log_debug("parseConnection: 
sourceId => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                                       if (connection)
+                                               
connection->setSourceProcessorUUID(uuid);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"destinationId") == 0) {
+                               char *id = (char *) 
xmlNodeGetContent(currentNode);
+                               if (id) {
+                                       _logger->log_debug("parseConnection: 
destinationId => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                                       if (connection)
+                                               
connection->setDestinationProcessorUUID(uuid);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxWorkQueueSize") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               int64_t maxWorkQueueSize = 0;
+                               if (temp) {
+                                       if (Property::StringToInt(temp, 
maxWorkQueueSize)) {
+                                               
_logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", 
maxWorkQueueSize);
+                                               if (connection)
+                                                       
connection->setMaxQueueSize(maxWorkQueueSize);
+
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxWorkQueueDataSize") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               int64_t maxWorkQueueDataSize = 0;
+                               if (temp) {
+                                       if (Property::StringToInt(temp, 
maxWorkQueueDataSize)) {
+                                               
_logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", 
maxWorkQueueDataSize);
+                                               if (connection)
+                                                       
connection->setMaxQueueDataSize(maxWorkQueueDataSize);
+
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"relationship") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       std::string relationshipName = temp;
+                                       if (!relationshipName.empty()) {
+                                               Relationship 
relationship(relationshipName, "");
+                                               
_logger->log_debug("parseConnection: relationship => [%s]", 
relationshipName.c_str());
+                                               if (connection)
+                                                       
connection->setRelationship(relationship);
+                                       } else {
+                                               Relationship empty;
+                                               
_logger->log_debug("parseConnection: relationship => [%s]", 
empty.getName().c_str());
+                                               if (connection)
+                                                       
connection->setRelationship(empty);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+       } // for node
+
+       if (connection)
+               parent->addConnection(connection);
+
+       return;
+}
+
+void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) {
+       uuid_t uuid;
+       xmlNode *currentNode;
+       ProcessGroup *group = NULL;
+
+       // generate the random UIID
+       uuid_generate(uuid);
+
+       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
+               if (currentNode->type == XML_ELEMENT_NODE) {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                               char *id = (char *) 
xmlNodeGetContent(currentNode);
+                               if (id) {
+                                       
_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
+                               char *name = (char *) 
xmlNodeGetContent(currentNode);
+                               if (name) {
+                                       
_logger->log_debug("parseRootProcessGroup: name => [%s]", name);
+                                       group = 
this->createRootProcessGroup(name, uuid);
+                                       if (group == NULL) {
+                                               xmlFree(name);
+                                               return;
+                                       }
+                                       // Set the root process group
+                                       this->_root = group;
+                                       this->_name = name;
+                                       xmlFree(name);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"processor") == 0) {
+                               this->parseProcessorNode(doc, currentNode, 
group);
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"connection") == 0) {
+                               this->parseConnection(doc, currentNode, group);
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"remoteProcessGroup") == 0) {
+                               this->parseRemoteProcessGroup(doc, currentNode, 
group);
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+       } // for node
+}
+
+void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
+       uuid_t uuid;
+       ProcessGroup *group = NULL;
+
+       // generate the random UIID
+       uuid_generate(uuid);
+
+       std::string flowName = rootFlowNode["name"].as<std::string>();
+
+       char uuidStr[37];
+       uuid_unparse(_uuid, uuidStr);
+       _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
+       _logger->log_debug("parseRootProcessGroup: name => [%s]", 
flowName.c_str());
+       group = this->createRootProcessGroup(flowName, uuid);
+       this->_root = group;
+       this->_name = flowName;
+}
+
+void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, 
ProcessGroup *parentGroup) {
+       int64_t schedulingPeriod = -1;
+       int64_t penalizationPeriod = -1;
+       int64_t yieldPeriod = -1;
+       int64_t runDurationNanos = -1;
+       uuid_t uuid;
+       Processor *processor = NULL;
+
+       if (!parentGroup) {
+               _logger->log_error("parseProcessNodeYaml: no parent group 
exists");
+               return;
+       }
+
+       if (processorsNode) {
+               // Evaluate sequence of processors
+               int numProcessors = processorsNode.size();
+               if (numProcessors < 1) {
+                       throw new std::invalid_argument("There must be at least 
one processor configured.");
+               }
+
+               std::vector<ProcessorConfig> processorConfigs;
+
+               if (processorsNode.IsSequence()) {
+                       for (YAML::const_iterator iter = 
processorsNode.begin(); iter != processorsNode.end(); ++iter) {
+                               ProcessorConfig procCfg;
+                               YAML::Node procNode = iter->as<YAML::Node>();
+
+                               procCfg.name = 
procNode["name"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: name => 
[%s]", procCfg.name.c_str());
+                               procCfg.javaClass = 
procNode["class"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: class 
=> [%s]", procCfg.javaClass.c_str());
+
+                               char uuidStr[37];
+                               uuid_unparse(_uuid, uuidStr);
+
+                               // generate the random UUID
+                               uuid_generate(uuid);
+
+                               // Determine the processor name only from the 
Java class
+                               int lastOfIdx = 
procCfg.javaClass.find_last_of(".");
+                               if (lastOfIdx != std::string::npos) {
+                                       lastOfIdx++; // if a value is found, 
increment to move beyond the .
+                                       int nameLength = 
procCfg.javaClass.length() - lastOfIdx;
+                                       std::string processorName = 
procCfg.javaClass.substr(lastOfIdx, nameLength);
+                                       processor = 
this->createProcessor(processorName, uuid);
+                               }
+
+                               if (!processor) {
+                                       _logger->log_error("Could not create a 
processor %s with name %s", procCfg.name.c_str(), uuidStr);
+                                       throw std::invalid_argument("Could not 
create processor " + procCfg.name);
+                               }
+                               processor->setName(procCfg.name);
+
+                               procCfg.maxConcurrentTasks = procNode["max 
concurrent tasks"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: max 
concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str());
+                               procCfg.schedulingStrategy = 
procNode["scheduling strategy"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: 
scheduling strategy => [%s]",
+                                               
procCfg.schedulingStrategy.c_str());
+                               procCfg.schedulingPeriod = procNode["scheduling 
period"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: 
scheduling period => [%s]", procCfg.schedulingPeriod.c_str());
+                               procCfg.penalizationPeriod = 
procNode["penalization period"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: 
penalization period => [%s]",
+                                               
procCfg.penalizationPeriod.c_str());
+                               procCfg.yieldPeriod = procNode["yield 
period"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: yield 
period => [%s]", procCfg.yieldPeriod.c_str());
+                               procCfg.yieldPeriod = procNode["run duration 
nanos"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: run 
duration nanos => [%s]", procCfg.runDurationNanos.c_str());
+
+                               // handle auto-terminated relationships
+                               YAML::Node autoTerminatedSequence = 
procNode["auto-terminated relationships list"];
+                               std::vector<std::string> 
rawAutoTerminatedRelationshipValues;
+                               if (autoTerminatedSequence.IsSequence() && 
!autoTerminatedSequence.IsNull()
+                                               && 
autoTerminatedSequence.size() > 0) {
+                                       for (YAML::const_iterator relIter = 
autoTerminatedSequence.begin();
+                                                       relIter != 
autoTerminatedSequence.end(); ++relIter) {
+                                               std::string autoTerminatedRel = 
relIter->as<std::string>();
+                                               
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+                                       }
+                               }
+                               procCfg.autoTerminatedRelationships = 
rawAutoTerminatedRelationshipValues;
+
+                               // handle processor properties
+                               YAML::Node propertiesNode = 
procNode["Properties"];
+                               parsePropertiesNodeYaml(&propertiesNode, 
processor);
+
+                               // Take care of scheduling
+                               TimeUnit unit;
+                               if 
(Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
+                                               && 
Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+                                       _logger->log_debug("convert: 
parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
+                                       
processor->setSchedulingPeriodNano(schedulingPeriod);
+                               }
+
+                               if 
(Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
+                                               && 
Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+                                       _logger->log_debug("convert: 
parseProcessorNode: penalizationPeriod => [%d] ms",
+                                                       penalizationPeriod);
+                                       
processor->setPenalizationPeriodMsec(penalizationPeriod);
+                               }
+
+                               if (Property::StringToTime(procCfg.yieldPeriod, 
yieldPeriod, unit)
+                                               && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+                                       _logger->log_debug("convert: 
parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
+                                       
processor->setYieldPeriodMsec(yieldPeriod);
+                               }
+
+                               // Default to running
+                               processor->setScheduledState(RUNNING);
+
+                               if (procCfg.schedulingStrategy == 
"TIMER_DRIVEN") {
+                                       
processor->setSchedulingStrategy(TIMER_DRIVEN);
+                                       _logger->log_debug("setting scheduling 
strategy as %s", procCfg.schedulingStrategy.c_str());
+                               } else if (procCfg.schedulingStrategy == 
"EVENT_DRIVEN") {
+                                       
processor->setSchedulingStrategy(EVENT_DRIVEN);
+                                       _logger->log_debug("setting scheduling 
strategy as %s", procCfg.schedulingStrategy.c_str());
+                               } else {
+                                       
processor->setSchedulingStrategy(CRON_DRIVEN);
+                                       _logger->log_debug("setting scheduling 
strategy as %s", procCfg.schedulingStrategy.c_str());
+
+                               }
+
+                               int64_t maxConcurrentTasks;
+                               if 
(Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
+                                       _logger->log_debug("parseProcessorNode: 
maxConcurrentTasks => [%d]", maxConcurrentTasks);
+                                       
processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                               }
+
+                               if 
(Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
+                                       _logger->log_debug("parseProcessorNode: 
runDurationNanos => [%d]", runDurationNanos);
+                                       
processor->setRunDurationNano(runDurationNanos);
+                               }
+
+                               std::set<Relationship> 
autoTerminatedRelationships;
+                               for (auto&& relString : 
procCfg.autoTerminatedRelationships) {
+                                       Relationship relationship(relString, 
"");
+                                       _logger->log_debug("parseProcessorNode: 
autoTerminatedRelationship  => [%s]", relString.c_str());
+                                       
autoTerminatedRelationships.insert(relationship);
+                               }
+
+                               
processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+                               parentGroup->addProcessor(processor);
+                       }
+               }
+       } else {
+               throw new std::invalid_argument(
+                               "Cannot instantiate a MiNiFi instance without a 
defined Processors configuration node.");
+       }
+}
+
+void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, 
ProcessGroup *parentGroup) {
+       uuid_t uuid;
+
+       if (!parentGroup) {
+               _logger->log_error("parseRemoteProcessGroupYaml: no parent 
group exists");
+               return;
+       }
+
+       if (rpgNode) {
+               if (rpgNode->IsSequence()) {
+                       for (YAML::const_iterator iter = rpgNode->begin(); iter 
!= rpgNode->end(); ++iter) {
+                               YAML::Node rpgNode = iter->as<YAML::Node>();
+
+                               auto name = rpgNode["name"].as<std::string>();
+                               
_logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str());
+
+                               std::string url = 
rpgNode["url"].as<std::string>();
+                               
_logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str());
+
+                               std::string timeout = 
rpgNode["timeout"].as<std::string>();
+                               
_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", 
timeout.c_str());
+
+                               std::string yieldPeriod = rpgNode["yield 
period"].as<std::string>();
+                               
_logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", 
yieldPeriod.c_str());
+
+                               YAML::Node inputPorts = rpgNode["Input 
Ports"].as<YAML::Node>();
+                               ProcessGroup* group = NULL;
+
+                               // generate the random UUID
+                               uuid_generate(uuid);
+
+                               char uuidStr[37];
+                               uuid_unparse(_uuid, uuidStr);
+
+                               int64_t timeoutValue = -1;
+                               int64_t yieldPeriodValue = -1;
+
+                               group = 
this->createRemoteProcessGroup(name.c_str(), uuid);
+                               group->setParent(parentGroup);
+                               parentGroup->addProcessGroup(group);
+
+                               TimeUnit unit;
+
+                               if (Property::StringToTime(yieldPeriod, 
yieldPeriodValue, unit)
+                                                       && 
Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && 
group) {
+                                       
_logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", 
yieldPeriodValue);
+                                       
group->setYieldPeriodMsec(yieldPeriodValue);
+                               }
+
+                               if (Property::StringToTime(timeout, 
timeoutValue, unit)
+                                       && 
Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
+                                       
_logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", 
timeoutValue);
+                                       group->setTimeOut(timeoutValue);
+                               }
+
+                               group->setTransmitting(true);
+                               group->setURL(url);
+
+                               if (inputPorts.IsSequence()) {
+                                       for (YAML::const_iterator portIter = 
inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
+                                               _logger->log_debug("Got a 
current port, iterating...");
+
+                                               YAML::Node currPort = 
portIter->as<YAML::Node>();
+
+                                               this->parsePortYaml(&currPort, 
group, SEND);
+                                       } // for node
+                               }
+                       }
+               }
+       }
+}
+
+void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, 
ProcessGroup *parent) {
+       uuid_t uuid;
+       Connection *connection = NULL;
+
+       if (!parent) {
+               _logger->log_error("parseProcessNode: no parent group was 
provided");
+               return;
+       }
+
+       if (connectionsNode) {
+               int numConnections = connectionsNode->size();
+               if (numConnections < 1) {
+                       throw new std::invalid_argument("There must be at least 
one connection configured.");
+               }
+
+               if (connectionsNode->IsSequence()) {
+                       for (YAML::const_iterator iter = 
connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
+                               // generate the random UIID
+                               uuid_generate(uuid);
+
+                               YAML::Node connectionNode = 
iter->as<YAML::Node>();
+
+                               std::string name = 
connectionNode["name"].as<std::string>();
+                               std::string destName = 
connectionNode["destination name"].as<std::string>();
+
+                               char uuidStr[37];
+                               uuid_unparse(_uuid, uuidStr);
+
+                               _logger->log_debug("Created connection with 
UUID %s and name %s", uuidStr, name.c_str());
+                               connection = this->createConnection(name, uuid);
+                               auto rawRelationship = connectionNode["source 
relationship name"].as<std::string>();
+                               Relationship relationship(rawRelationship, "");
+                               _logger->log_debug("parseConnection: 
relationship => [%s]", rawRelationship.c_str());
+                               if (connection)
+                                       
connection->setRelationship(relationship);
+                               std::string connectionSrcProcName = 
connectionNode["source name"].as<std::string>();
+
+                               Processor *srcProcessor = 
this->_root->findProcessor(connectionSrcProcName);
+
+                               if (!srcProcessor) {
+                                       _logger->log_error("Could not locate a 
source with name %s to create a connection",
+                                                       
connectionSrcProcName.c_str());
+                                       throw std::invalid_argument(
+                                                       "Could not locate a 
source with name %s to create a connection " + connectionSrcProcName);
+                               }
+
+                               Processor *destProcessor = 
this->_root->findProcessor(destName);
+                               // If we could not find name, try by UUID
+                               if (!destProcessor) {
+                                       uuid_t destUuid;
+                                       uuid_parse(destName.c_str(), destUuid);
+                                       destProcessor = 
this->_root->findProcessor(destUuid);
+                               }
+                               if (destProcessor) {
+                                       std::string destUuid = 
destProcessor->getUUIDStr();
+                               }
+
+                               uuid_t srcUuid;
+                               uuid_t destUuid;
+                               srcProcessor->getUUID(srcUuid);
+                               connection->setSourceProcessorUUID(srcUuid);
+                               destProcessor->getUUID(destUuid);
+                               
connection->setDestinationProcessorUUID(destUuid);
+
+                               if (connection) {
+                                       parent->addConnection(connection);
+                               }
+                       }
+               }
+
+               if (connection)
+                       parent->addConnection(connection);
+
+               return;
+       }
+}
+
+void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, 
ProcessGroup *parent) {
+       uuid_t uuid;
+       xmlNode *currentNode;
+       ProcessGroup *group = NULL;
+       int64_t yieldPeriod = -1;
+       int64_t timeOut = -1;
+
+// generate the random UIID
+       uuid_generate(uuid);
+
+       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
+               if (currentNode->type == XML_ELEMENT_NODE) {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                               char *id = (char *) 
xmlNodeGetContent(currentNode);
+                               if (id) {
+                                       
_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
+                               char *name = (char *) 
xmlNodeGetContent(currentNode);
+                               if (name) {
+                                       
_logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
+                                       group = 
this->createRemoteProcessGroup(name, uuid);
+                                       if (group == NULL) {
+                                               xmlFree(name);
+                                               return;
+                                       }
+                                       group->setParent(parent);
+                                       parent->addProcessGroup(group);
+                                       xmlFree(name);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"yieldPeriod") == 0) {
+                               TimeUnit unit;
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       if (Property::StringToTime(temp, 
yieldPeriod, unit)
+                                                       && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) {
+                                               
_logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", 
yieldPeriod);
+                                               
group->setYieldPeriodMsec(yieldPeriod);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"timeout") == 0) {
+                               TimeUnit unit;
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       if (Property::StringToTime(temp, 
timeOut, unit)
+                                                       && 
Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) {
+                                               
_logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
+                                               group->setTimeOut(timeOut);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"transmitting") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               bool transmitting;
+                               if (temp) {
+                                       if (Property::StringToBool(temp, 
transmitting) && group) {
+                                               
_logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", 
transmitting);
+                                               
group->setTransmitting(transmitting);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"inputPort") == 0 && group) {
+                               this->parsePort(doc, currentNode, group, SEND);
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"outputPort") == 0 && group) {
+                               this->parsePort(doc, currentNode, group, 
RECEIVE);
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+       } // for node
+}
+
+void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, 
Processor *processor) {
+       xmlNode *currentNode;
+       std::string propertyValue;
+       std::string propertyName;
+
+       if (!processor) {
+               _logger->log_error("parseProcessorProperty: no parent processor 
existed");
+               return;
+       }
+
+       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
+               if (currentNode->type == XML_ELEMENT_NODE) {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) 
{
+                               char *name = (char *) 
xmlNodeGetContent(currentNode);
+                               if (name) {
+                                       _logger->log_debug("parseProcessorNode: 
name => [%s]", name);
+                                       propertyName = name;
+                                       xmlFree(name);
+                               }
+                       }
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 
0) {
+                               char *value = (char *) 
xmlNodeGetContent(currentNode);
+                               if (value) {
+                                       _logger->log_debug("parseProcessorNode: 
value => [%s]", value);
+                                       propertyValue = value;
+                                       xmlFree(value);
+                               }
+                       }
+                       if (!propertyName.empty() && !propertyValue.empty()) {
+                               processor->setProperty(propertyName, 
propertyValue);
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+       } // for node
+}
+
+void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, 
TransferDirection direction) {
+       uuid_t uuid;
+       Processor *processor = NULL;
+       RemoteProcessorGroupPort *port = NULL;
+
+       if (!parent) {
+               _logger->log_error("parseProcessNode: no parent group existed");
+               return;
+       }
+
+       YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+
+       // generate the random UIID
+       uuid_generate(uuid);
+
+       auto portId = inputPortsObj["id"].as<std::string>();
+       auto nameStr = inputPortsObj["name"].as<std::string>();
+       uuid_parse(portId.c_str(), uuid);
+
+       port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
+
+       processor = (Processor *) port;
+       port->setDirection(direction);
+       port->setTimeOut(parent->getTimeOut());
+       port->setTransmitting(true);
+       processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+       processor->initialize();
+
+       // handle port properties
+       YAML::Node nodeVal = portNode->as<YAML::Node>();
+       YAML::Node propertiesNode = nodeVal["Properties"];
+
+       parsePropertiesNodeYaml(&propertiesNode, processor);
+
+       // add processor to parent
+       parent->addProcessor(processor);
+       processor->setScheduledState(RUNNING);
+       auto rawMaxConcurrentTasks = inputPortsObj["max concurrent 
tasks"].as<std::string>();
+       int64_t maxConcurrentTasks;
+       if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+               processor->setMaxConcurrentTasks(maxConcurrentTasks);
+       }
+       _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
+       processor->setMaxConcurrentTasks(maxConcurrentTasks);
+
+}
+
+void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, 
ProcessGroup *parent, TransferDirection direction) {
+       char *id = NULL;
+       char *name = NULL;
+       uuid_t uuid;
+       xmlNode *currentNode;
+       Processor *processor = NULL;
+       RemoteProcessorGroupPort *port = NULL;
+
+       if (!parent) {
+               _logger->log_error("parseProcessNode: no parent group existed");
+               return;
+       }
+// generate the random UIID
+       uuid_generate(uuid);
+
+       for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
+               if (currentNode->type == XML_ELEMENT_NODE) {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                               id = (char *) xmlNodeGetContent(currentNode);
+                               if (id) {
+                                       _logger->log_debug("parseProcessorNode: 
id => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
+                               name = (char *) xmlNodeGetContent(currentNode);
+                               if (name) {
+                                       _logger->log_debug("parseProcessorNode: 
name => [%s]", name);
+                                       port = new 
RemoteProcessorGroupPort(name, uuid);
+                                       processor = (Processor *) port;
+                                       if (processor == NULL) {
+                                               xmlFree(name);
+                                               return;
+                                       }
+                                       port->setDirection(direction);
+                                       port->setTimeOut(parent->getTimeOut());
+                                       
port->setTransmitting(parent->getTransmitting());
+                                       
processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+                                       processor->initialize();
+                                       // add processor to parent
+                                       parent->addProcessor(processor);
+                                       xmlFree(name);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"scheduledState") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       std::string state = temp;
+                                       if (state == "DISABLED") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(DISABLED);
+                                       }
+                                       if (state == "STOPPED") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(STOPPED);
+                                       }
+                                       if (state == "RUNNING") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(RUNNING);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxConcurrentTasks") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       int64_t maxConcurrentTasks;
+                                       if (Property::StringToInt(temp, 
maxConcurrentTasks)) {
+                                               
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
+                                               
processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"property") == 0) {
+                               this->parseProcessorProperty(doc, currentNode, 
processor);
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+       } // while node
+}
+
+void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, 
ProcessGroup *parent) {
+       char *id = NULL;
+       char *name = NULL;
+       int64_t schedulingPeriod = -1;
+       int64_t penalizationPeriod = -1;
+       int64_t yieldPeriod = -1;
+       bool lossTolerant = false;
+       int64_t runDurationNanos = -1;
+       uuid_t uuid;
+       xmlNode *currentNode;
+       Processor *processor = NULL;
+
+       if (!parent) {
+               _logger->log_error("parseProcessNode: no parent group existed");
+               return;
+       }
+// generate the random UIID
+       uuid_generate(uuid);
+
+       for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
+               if (currentNode->type == XML_ELEMENT_NODE) {
+                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
+                               id = (char *) xmlNodeGetContent(currentNode);
+                               if (id) {
+                                       _logger->log_debug("parseProcessorNode: 
id => [%s]", id);
+                                       uuid_parse(id, uuid);
+                                       xmlFree(id);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
+                               name = (char *) xmlNodeGetContent(currentNode);
+                               if (name) {
+                                       _logger->log_debug("parseProcessorNode: 
name => [%s]", name);
+                                       processor = this->createProcessor(name, 
uuid);
+                                       if (processor == NULL) {
+                                               xmlFree(name);
+                                               return;
+                                       }
+                                       // add processor to parent
+                                       parent->addProcessor(processor);
+                                       xmlFree(name);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"schedulingPeriod") == 0) {
+                               TimeUnit unit;
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       if (Property::StringToTime(temp, 
schedulingPeriod, unit)
+                                                       && 
Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+                                               
_logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", 
schedulingPeriod);
+                                               
processor->setSchedulingPeriodNano(schedulingPeriod);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"penalizationPeriod") == 0) {
+                               TimeUnit unit;
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       if (Property::StringToTime(temp, 
penalizationPeriod, unit)
+                                                       && 
Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+                                               
_logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", 
penalizationPeriod);
+                                               
processor->setPenalizationPeriodMsec(penalizationPeriod);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"yieldPeriod") == 0) {
+                               TimeUnit unit;
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       if (Property::StringToTime(temp, 
yieldPeriod, unit)
+                                                       && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+                                               
_logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
+                                               
processor->setYieldPeriodMsec(yieldPeriod);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"lossTolerant") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       if (Property::StringToBool(temp, 
lossTolerant)) {
+                                               
_logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant);
+                                               
processor->setlossTolerant(lossTolerant);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"scheduledState") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       std::string state = temp;
+                                       if (state == "DISABLED") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(DISABLED);
+                                       }
+                                       if (state == "STOPPED") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(STOPPED);
+                                       }
+                                       if (state == "RUNNING") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
+                                               
processor->setScheduledState(RUNNING);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"schedulingStrategy") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       std::string strategy = temp;
+                                       if (strategy == "TIMER_DRIVEN") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", 
strategy.c_str());
+                                               
processor->setSchedulingStrategy(TIMER_DRIVEN);
+                                       }
+                                       if (strategy == "EVENT_DRIVEN") {
+                                               
_logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", 
strategy.c_str());
+                                               
processor->setSchedulingStrategy(EVENT_DRIVEN);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxConcurrentTasks") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       int64_t maxConcurrentTasks;
+                                       if (Property::StringToInt(temp, 
maxConcurrentTasks)) {
+                                               
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
+                                               
processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"runDurationNanos") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       if (Property::StringToInt(temp, 
runDurationNanos)) {
+                                               
_logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", 
runDurationNanos);
+                                               
processor->setRunDurationNano(runDurationNanos);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"autoTerminatedRelationship") == 0) {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp) {
+                                       std::string relationshipName = temp;
+                                       Relationship 
relationship(relationshipName, "");
+                                       std::set<Relationship> relationships;
+
+                                       relationships.insert(relationship);
+                                       
processor->setAutoTerminatedRelationships(relationships);
+                                       _logger->log_debug("parseProcessorNode: 
autoTerminatedRelationship  => [%s]",
+                                                       
relationshipName.c_str());
+                                       xmlFree(temp);
+                               }
+                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"property") == 0) {
+                               this->parseProcessorProperty(doc, currentNode, 
processor);
+                       }
+               } // if (currentNode->type == XML_ELEMENT_NODE)
+       } // while node
+}
+
+void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, 
Processor *processor)
+{
+    // Treat generically as a YAML node so we can perform inspection on 
entries to ensure they are populated
+    for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter 
!= propertiesNode->end(); ++propsIter)
+    {
+        std::string propertyName = propsIter->first.as<std::string>();
+        YAML::Node propertyValueNode = propsIter->second;
+        if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined())
+        {
+            std::string rawValueString = propertyValueNode.as<std::string>();
+            if (!processor->setProperty(propertyName, rawValueString))
+            {
+                _logger->log_warn("Received property %s with value %s but is 
not one of the properties for %s", propertyName.c_str(), 
rawValueString.c_str(), processor->getName().c_str());
+            }
+        }
+    }
+}
+
+void FlowController::load(ConfigFormat configFormat) {
+       if (_running) {
+               stop(true);
+       }
+       if (!_initialized) {
+               _logger->log_info("Load Flow Controller from file %s", 
_configurationFileName.c_str());
+
+               if (ConfigFormat::XML == configFormat) {
+                       _logger->log_info("Detected an XML configuration file 
for processing.");
+
+                       xmlDoc *doc = 
xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET);
+                       if (doc == NULL) {
+                               _logger->log_error("xmlReadFile returned NULL 
when reading [%s]", _configurationFileName.c_str());
+                               _initialized = true;
+                               return;
+                       }
+
+                       xmlNode *root = xmlDocGetRootElement(doc);
+
+                       if (root == NULL) {
+                               _logger->log_error("Can not get root from XML 
doc %s", _configurationFileName.c_str());
+                               xmlFreeDoc(doc);
+                               xmlCleanupParser();
+                       }
+
+                       if (xmlStrcmp(root->name, BAD_CAST "flowController") != 
0) {
+                               _logger->log_error("Root name is not 
flowController for XML doc %s", _configurationFileName.c_str());
+                               xmlFreeDoc(doc);
+                               xmlCleanupParser();
+                               return;
+                       }
+
+                       xmlNode *currentNode;
+
+                       for (currentNode = root->xmlChildrenNode; currentNode 
!= NULL; currentNode = currentNode->next) {
+                               if (currentNode->type == XML_ELEMENT_NODE) {
+                                       if (xmlStrcmp(currentNode->name, 
BAD_CAST "rootGroup") == 0) {
+                                               
this->parseRootProcessGroup(doc, currentNode);
+                                       } else if (xmlStrcmp(currentNode->name, 
BAD_CAST "maxTimerDrivenThreadCount") == 0) {
+                                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                                               int64_t 
maxTimerDrivenThreadCount;
+                                               if (temp) {
+                                                       if 
(Property::StringToInt(temp, maxTimerDrivenThreadCount)) {
+                                                               
_logger->log_debug("maxTimerDrivenThreadCount => [%d]", 
maxTimerDrivenThreadCount);
+                                                               
this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount;
+                                                       }
+                                                       xmlFree(temp);
+                                               }
+                                       } else if (xmlStrcmp(currentNode->name, 
BAD_CAST "maxEventDrivenThreadCount") == 0) {
+                                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                                               int64_t 
maxEventDrivenThreadCount;
+                                               if (temp) {
+                                                       if 
(Property::StringToInt(temp, maxEventDrivenThreadCount)) {
+                                                               
_logger->log_debug("maxEventDrivenThreadCount => [%d]", 
maxEventDrivenThreadCount);
+                                                               
this->_maxEventDrivenThreads = maxEventDrivenThreadCount;
+                                                       }
+                                                       xmlFree(temp);
+                                               }
+                                       }
+                               } // type == XML_ELEMENT_NODE
+                       } // for
+
+                       xmlFreeDoc(doc);
+                       xmlCleanupParser();
+                       _initialized = true;
+               } else if (ConfigFormat::YAML == configFormat) {
+                       YAML::Node flow = 
YAML::LoadFile(_configurationFileName);
+
+                       YAML::Node flowControllerNode = flow["Flow Controller"];
+                       YAML::Node processorsNode = 
flow[CONFIG_YAML_PROCESSORS_KEY];
+                       YAML::Node connectionsNode = flow["Connections"];
+                       YAML::Node remoteProcessingGroupNode = flow["Remote 
Processing Groups"];
+
+                       // Create the root process group
+                       parseRootProcessGroupYaml(flowControllerNode);
+                       parseProcessorNodeYaml(processorsNode, this->_root);
+                       parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, 
this->_root);
+                       parseConnectionYaml(&connectionsNode, this->_root);
+
+                       _initialized = true;
+               }
+       }
+}
+
+bool FlowController::start() {
+       if (!_initialized) {
+               _logger->log_error("Can not start Flow Controller because it 
has not been initialized");
+               return false;
+       } else {
+               if (!_running) {
+                       _logger->log_info("Start Flow Controller");
+                       this->_timerScheduler.start();
+                       if (this->_root)
+                               
this->_root->startProcessing(&this->_timerScheduler);
+                       _running = true;
+                       this->_protocol->start();
+               }
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
new file mode 100644
index 0000000..2dda47a
--- /dev/null
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -0,0 +1,231 @@
+/**
+ * @file FlowFileRecord.cpp
+ * Flow file record class implementation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <sys/time.h>
+#include <time.h>
+#include <iostream>
+#include <fstream>
+#include <cstdio>
+
+#include "FlowFileRecord.h"
+#include "Relationship.h"
+#include "Logger.h"
+
+std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
+
+FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, 
ResourceClaim *claim)
+: _size(0),
+  _id(_localFlowSeqNumber.load()),
+  _offset(0),
+  _penaltyExpirationMs(0),
+  _claim(claim),
+  _markedDelete(false),
+  _connection(NULL),
+  _orginalConnection(NULL)
+{
+       _entryDate = getTimeMillis();
+       _lineageStartDate = _entryDate;
+
+       char uuidStr[37];
+
+       // Generate the global UUID for the flow record
+       uuid_generate(_uuid);
+       // Increase the local ID for the flow record
+       ++_localFlowSeqNumber;
+       uuid_unparse(_uuid, uuidStr);
+       _uuidStr = uuidStr;
+
+       // Populate the default attributes
+    addAttribute(FILENAME, std::to_string(getTimeNano()));
+    addAttribute(PATH, DEFAULT_FLOWFILE_PATH);
+    addAttribute(UUID, uuidStr);
+       // Populate the attributes from the input
+    std::map<std::string, std::string>::iterator it;
+    for (it = attributes.begin(); it!= attributes.end(); it++)
+    {
+       addAttribute(it->first, it->second);
+    }
+
+    _snapshot = false;
+
+       if (_claim)
+               // Increase the flow file record owned count for the resource 
claim
+               _claim->increaseFlowFileRecordOwnedCount();
+       _logger = Logger::getLogger();
+}
+
+FlowFileRecord::~FlowFileRecord()
+{
+       if (!_snapshot)
+               _logger->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str());
+       else
+               _logger->log_debug("Delete SnapShot FlowFile UUID %s", 
_uuidStr.c_str());
+       if (_claim)
+       {
+               // Decrease the flow file record owned count for the resource 
claim
+               _claim->decreaseFlowFileRecordOwnedCount();
+               if (_claim->getFlowFileRecordOwnedCount() == 0)
+               {
+                       _logger->log_debug("Delete Resource Claim %s", 
_claim->getContentFullPath().c_str());
+                       std::remove(_claim->getContentFullPath().c_str());
+                       delete _claim;
+               }
+       }
+}
+
+bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return addAttribute(keyString, value);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::addAttribute(std::string key, std::string value)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               // attribute already there in the map
+               return false;
+       }
+       else
+       {
+               _attributes[key] = value;
+               return true;
+       }
+}
+
+bool FlowFileRecord::removeAttribute(FlowAttribute key)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return removeAttribute(keyString);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::removeAttribute(std::string key)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               _attributes.erase(key);
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return updateAttribute(keyString, value);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::updateAttribute(std::string key, std::string value)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               _attributes[key] = value;
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return getAttribute(keyString, value);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::getAttribute(std::string key, std::string &value)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               value = it->second;
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+void FlowFileRecord::duplicate(FlowFileRecord *original)
+{
+       uuid_copy(this->_uuid, original->_uuid);
+       this->_attributes = original->_attributes;
+       this->_entryDate = original->_entryDate;
+       this->_id = original->_id;
+       this->_lastQueueDate = original->_lastQueueDate;
+       this->_lineageStartDate = original->_lineageStartDate;
+       this->_offset = original->_offset;
+       this->_penaltyExpirationMs = original->_penaltyExpirationMs;
+       this->_size = original->_size;
+       this->_lineageIdentifiers = original->_lineageIdentifiers;
+       this->_orginalConnection = original->_orginalConnection;
+       this->_uuidStr = original->_uuidStr;
+       this->_connection = original->_connection;
+       this->_markedDelete = original->_markedDelete;
+
+       this->_claim = original->_claim;
+       if (this->_claim)
+               this->_claim->increaseFlowFileRecordOwnedCount();
+
+       this->_snapshot = true;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GenerateFlowFile.cpp 
b/libminifi/src/GenerateFlowFile.cpp
new file mode 100644
index 0000000..4b0603d
--- /dev/null
+++ b/libminifi/src/GenerateFlowFile.cpp
@@ -0,0 +1,134 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+
+#include "GenerateFlowFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
+const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
+const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
+Property GenerateFlowFile::FileSize("File Size", "The size of the file that 
will be used", "1 kB");
+Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to 
be transferred in each invocation", "1");
+Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the 
data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
+Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
+               "If true, each FlowFile that is generated will be unique. If 
false, a random value will be generated and all FlowFiles", "true");
+Relationship GenerateFlowFile::Success("success", "success operational on the 
flow record");
+
+void GenerateFlowFile::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(FileSize);
+       properties.insert(BatchSize);
+       properties.insert(DataFormat);
+       properties.insert(UniqueFlowFiles);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession 
*session)
+{
+       int64_t batchSize = 1;
+       bool uniqueFlowFile = true;
+       int64_t fileSize = 1024;
+
+       std::string value;
+       if (context->getProperty(FileSize.getName(), value))
+       {
+               Property::StringToInt(value, fileSize);
+       }
+       if (context->getProperty(BatchSize.getName(), value))
+       {
+               Property::StringToInt(value, batchSize);
+       }
+       if (context->getProperty(UniqueFlowFiles.getName(), value))
+       {
+               Property::StringToBool(value, uniqueFlowFile);
+       }
+
+       if (!uniqueFlowFile)
+       {
+               char *data;
+               data = new char[fileSize];
+               if (!data)
+                       return;
+               uint64_t dataSize = fileSize;
+               GenerateFlowFile::WriteCallback callback(data, dataSize);
+               char *current = data;
+               for (int i = 0; i < fileSize; i+= sizeof(int))
+               {
+                       int randValue = random();
+                       *((int *) current) = randValue;
+                       current += sizeof(int);
+               }
+               for (int i = 0; i < batchSize; i++)
+               {
+                       // For each batch
+                       FlowFileRecord *flowFile = session->create();
+                       if (!flowFile)
+                               return;
+                       if (fileSize > 0)
+                               session->write(flowFile, &callback);
+                       session->transfer(flowFile, Success);
+               }
+               delete[] data;
+       }
+       else
+       {
+               if (!_data)
+               {
+                       // We have not create the unique data yet
+                       _data = new char[fileSize];
+                       _dataSize = fileSize;
+                       char *current = _data;
+                       for (int i = 0; i < fileSize; i+= sizeof(int))
+                       {
+                               int randValue = random();
+                               *((int *) current) = randValue;
+                               // *((int *) current) = (0xFFFFFFFF & i);
+                               current += sizeof(int);
+                       }
+               }
+               GenerateFlowFile::WriteCallback callback(_data, _dataSize);
+               for (int i = 0; i < batchSize; i++)
+               {
+                       // For each batch
+                       FlowFileRecord *flowFile = session->create();
+                       if (!flowFile)
+                               return;
+                       if (fileSize > 0)
+                               session->write(flowFile, &callback);
+                       session->transfer(flowFile, Success);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp
new file mode 100644
index 0000000..02e196a
--- /dev/null
+++ b/libminifi/src/GetFile.cpp
@@ -0,0 +1,295 @@
+/**
+ * @file GetFile.cpp
+ * GetFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <dirent.h>
+#include <limits.h>
+#include <unistd.h>
+#include <regex>
+
+#include "TimeUtil.h"
+#include "GetFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string GetFile::ProcessorName("GetFile");
+Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull 
in each iteration", "10");
+Property GetFile::Directory("Input Directory", "The input directory from which 
to pull files", ".");
+Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether 
or not hidden files should be ignored", "true");
+Property GetFile::KeepSourceFile("Keep Source File",
+               "If true, the file is not deleted after it has been copied to 
the Content Repository", "false");
+Property GetFile::MaxAge("Maximum File Age",
+               "The minimum age that a file must be in order to be pulled; any 
file younger than this amount of time (according to last modification date) 
will be ignored", "0 sec");
+Property GetFile::MinAge("Minimum File Age",
+               "The maximum age that a file must be in order to be pulled; any 
file older than this amount of time (according to last modification date) will 
be ignored", "0 sec");
+Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file 
can be in order to be pulled", "0 B");
+Property GetFile::MinSize("Minimum File Size", "The minimum size that a file 
must be in order to be pulled", "0 B");
+Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait 
before performing a directory listing", "0 sec");
+Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not 
to pull files from subdirectories", "true");
+Property GetFile::FileFilter("File Filter", "Only files whose names match the 
given regular expression will be picked up", "[^\\.].*");
+Relationship GetFile::Success("success", "All files are routed to success");
+
+void GetFile::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(BatchSize);
+       properties.insert(Directory);
+       properties.insert(IgnoreHiddenFile);
+       properties.insert(KeepSourceFile);
+       properties.insert(MaxAge);
+       properties.insert(MinAge);
+       properties.insert(MaxSize);
+       properties.insert(MinSize);
+       properties.insert(PollInterval);
+       properties.insert(Recurse);
+       properties.insert(FileFilter);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string value;
+       if (context->getProperty(Directory.getName(), value))
+       {
+               _directory = value;
+       }
+       if (context->getProperty(BatchSize.getName(), value))
+       {
+               Property::StringToInt(value, _batchSize);
+       }
+       if (context->getProperty(IgnoreHiddenFile.getName(), value))
+       {
+               Property::StringToBool(value, _ignoreHiddenFile);
+       }
+       if (context->getProperty(KeepSourceFile.getName(), value))
+       {
+               Property::StringToBool(value, _keepSourceFile);
+       }
+       if (context->getProperty(MaxAge.getName(), value))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(value, _maxAge, unit) &&
+                       Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge))
+               {
+
+               }
+       }
+       if (context->getProperty(MinAge.getName(), value))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(value, _minAge, unit) &&
+                       Property::ConvertTimeUnitToMS(_minAge, unit, _minAge))
+               {
+
+               }
+       }
+       if (context->getProperty(MaxSize.getName(), value))
+       {
+               Property::StringToInt(value, _maxSize);
+       }
+       if (context->getProperty(MinSize.getName(), value))
+       {
+               Property::StringToInt(value, _minSize);
+       }
+       if (context->getProperty(PollInterval.getName(), value))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(value, _pollInterval, unit) &&
+                       Property::ConvertTimeUnitToMS(_pollInterval, unit, 
_pollInterval))
+               {
+
+               }
+       }
+       if (context->getProperty(Recurse.getName(), value))
+       {
+               Property::StringToBool(value, _recursive);
+       }
+
+       if (context->getProperty(FileFilter.getName(), value))
+       {
+               _fileFilter = value;
+       }
+
+       // Perform directory list
+       if (isListingEmpty())
+       {
+               if (_pollInterval == 0 || (getTimeMillis() - 
_lastDirectoryListingTime) > _pollInterval)
+               {
+                       performListing(_directory);
+               }
+       }
+
+       if (!isListingEmpty())
+       {
+               try
+               {
+                       std::queue<std::string> list;
+                       pollListing(list, _batchSize);
+                       while (!list.empty())
+                       {
+                               std::string fileName = list.front();
+                               list.pop();
+                               _logger->log_info("GetFile process %s", 
fileName.c_str());
+                               FlowFileRecord *flowFile = session->create();
+                               if (!flowFile)
+                                       return;
+                               std::size_t found = 
fileName.find_last_of("/\\");
+                               std::string path = fileName.substr(0,found);
+                               std::string name = fileName.substr(found+1);
+                               flowFile->updateAttribute(FILENAME, name);
+                               flowFile->updateAttribute(PATH, path);
+                               flowFile->addAttribute(ABSOLUTE_PATH, fileName);
+                               session->import(fileName, flowFile, 
_keepSourceFile);
+                               session->transfer(flowFile, Success);
+                       }
+               }
+               catch (std::exception &exception)
+               {
+                       _logger->log_debug("GetFile Caught Exception %s", 
exception.what());
+                       throw;
+               }
+               catch (...)
+               {
+                       throw;
+               }
+       }
+}
+
+bool GetFile::isListingEmpty()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       return _dirList.empty();
+}
+
+void GetFile::putListing(std::string fileName)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _dirList.push(fileName);
+}
+
+void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize))
+       {
+               std::string fileName = _dirList.front();
+               _dirList.pop();
+               list.push(fileName);
+       }
+
+       return;
+}
+
+bool GetFile::acceptFile(std::string fileName)
+{
+       struct stat statbuf;
+
+       if (stat(fileName.c_str(), &statbuf) == 0)
+       {
+               if (_minSize > 0 && statbuf.st_size <_minSize)
+                       return false;
+
+               if (_maxSize > 0 && statbuf.st_size > _maxSize)
+                       return false;
+
+               uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
+               uint64_t fileAge = getTimeMillis() - modifiedTime;
+               if (_minAge > 0 && fileAge < _minAge)
+                       return false;
+               if (_maxAge > 0 && fileAge > _maxAge)
+                       return false;
+
+               if (_ignoreHiddenFile && fileName.c_str()[0] == '.')
+                       return false;
+
+               if (access(fileName.c_str(), R_OK) != 0)
+                       return false;
+
+               if (_keepSourceFile == false && access(fileName.c_str(), W_OK) 
!= 0)
+                       return false;
+
+               try {
+                       std::regex re(_fileFilter);
+                       if (!std::regex_match(fileName, re)) {
+                               return false;
+                       }
+               } catch (std::regex_error e) {
+                       _logger->log_error("Invalid File Filter regex: %s.", 
e.what());
+                       return false;
+               }
+
+               return true;
+       }
+
+       return false;
+}
+
+void GetFile::performListing(std::string dir)
+{
+       DIR *d;
+       d = opendir(dir.c_str());
+       if (!d)
+               return;
+       while (1)
+       {
+               struct dirent *entry;
+               entry = readdir(d);
+               if (!entry)
+                       break;
+               std::string d_name = entry->d_name;
+               if ((entry->d_type & DT_DIR))
+               {
+                       // if this is a directory
+                       if (_recursive && strcmp(d_name.c_str(), "..") != 0 && 
strcmp(d_name.c_str(), ".") != 0)
+                       {
+                               std::string path = dir + "/" + d_name;
+                               performListing(path);
+                       }
+               }
+               else
+               {
+                       std::string fileName = dir + "/" + d_name;
+                       if (acceptFile(fileName))
+                       {
+                               // check whether we can take this file
+                               putListing(fileName);
+                       }
+               }
+       }
+       closedir(d);
+}

Reply via email to