MINIFI-190: Creating initial commit with changes

Resolves MINIFI-194 by using a semaphore in place of the
FlowController instance. Stop is performed outside of the
signal handler to avoid synchronicity issues.

Resolves MINIFI-192 by using lock_guard based on a conditional

Resolves issues found with MINIFI-190 regarding GetFile. Added
pragma definitions for GCC < 4.9

This closes #47.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/09d973ba
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/09d973ba
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/09d973ba

Branch: refs/heads/master
Commit: 09d973bafb7d798a1348e3527f3484ca9fca26be
Parents: 63dbb82
Author: Marc <[email protected]>
Authored: Thu Feb 9 20:40:32 2017 -0500
Committer: Aldrin Piri <[email protected]>
Committed: Sun Feb 12 15:47:34 2017 -0500

----------------------------------------------------------------------
 .travis.yml                                |    2 +-
 CMakeLists.txt                             |    2 -
 libminifi/include/Configure.h              |    2 +
 libminifi/include/FlowController.h         |  238 +++--
 libminifi/include/Logger.h                 |   32 +
 libminifi/include/Processor.h              |    2 +-
 libminifi/include/Provenance.h             |  219 +----
 libminifi/include/ResourceClaim.h          |    6 +-
 libminifi/include/Serializable.h           |  294 ++++++
 libminifi/src/Configure.cpp                |    2 +
 libminifi/src/FlowControlProtocol.cpp      |   21 -
 libminifi/src/FlowController.cpp           | 1103 ++++++++++++-----------
 libminifi/src/FlowFileRecord.cpp           |    2 +-
 libminifi/src/GetFile.cpp                  |   55 +-
 libminifi/src/ProcessGroup.cpp             |   14 +-
 libminifi/src/ProcessSession.cpp           |   57 +-
 libminifi/src/Processor.cpp                |  108 +--
 libminifi/src/Provenance.cpp               |  822 ++++++-----------
 libminifi/src/PutFile.cpp                  |    2 +
 libminifi/src/ResourceClaim.cpp            |    4 +
 libminifi/src/Serializable.cpp             |  365 ++++++++
 libminifi/src/Site2SitePeer.cpp            |    2 +-
 libminifi/test/Server.cpp                  |    1 -
 libminifi/test/TestBase.h                  |   62 +-
 libminifi/test/unit/ProcessorTests.h       |   72 +-
 libminifi/test/unit/ProvenanceTestHelper.h |   70 ++
 libminifi/test/unit/ProvenanceTests.h      |   66 +-
 main/MiNiFiMain.cpp                        |   98 +-
 28 files changed, 2230 insertions(+), 1493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 91ef329..b64902f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,7 +22,7 @@ script:
   # Establish updated toolchain as default
   - sudo unlink /usr/bin/gcc && sudo ln -s /usr/bin/gcc-4.8 /usr/bin/gcc
   - sudo unlink /usr/bin/g++ && sudo ln -s /usr/bin/g++-4.8 /usr/bin/g++
-  - mkdir ./build && cd ./build && cmake .. && make && make test
+  - mkdir ./build && cd ./build && cmake .. && make && ./tests
 
 addons:
   apt:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9900e2f..f19431e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -60,10 +60,8 @@ endif()
 list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
 
 find_package(UUID REQUIRED)
-
 file(GLOB SPD_SOURCES "include/spdlog/*")
 
-
 add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3)
 add_subdirectory(libminifi)
 add_subdirectory(main)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h
index 00b7742..f4aae3c 100644
--- a/libminifi/include/Configure.h
+++ b/libminifi/include/Configure.h
@@ -44,6 +44,8 @@ public:
        static const char *nifi_flow_configuration_file;
        static const char *nifi_administrative_yield_duration;
        static const char *nifi_bored_yield_duration;
+       static const char *nifi_graceful_shutdown_seconds;
+       static const char *nifi_log_level;
        static const char *nifi_server_name;
        static const char *nifi_server_port;
        static const char *nifi_server_report_interval;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index b98393e..02c39b1 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -28,7 +28,7 @@
 #include <atomic>
 #include <algorithm>
 #include <set>
-#include <yaml-cpp/yaml.h>
+#include "yaml-cpp/yaml.h"
 
 #include "Configure.h"
 #include "Property.h"
@@ -75,21 +75,15 @@ struct ProcessorConfig {
        std::vector<Property> properties;
 };
 
-//! FlowController Class
-class FlowController
-{
+/**
+ * Flow Controller class. Generally used by FlowController factory
+ * as a singleton.
+ */
+class FlowController {
 public:
-    static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
-    static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
-       //! Get the singleton flow controller
-       static FlowController * getFlowController()
-       {
-               if (!_flowController)
-               {
-                       _flowController = new FlowController();
-               }
-               return _flowController;
-       }
+       static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
+       static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
+
        //! passphase for the private file callback
        static int pemPassWordCb(char *buf, int size, int rwflag, void 
*userdata)
        {
@@ -111,96 +105,104 @@ public:
        }
 
        //! Destructor
-       virtual ~FlowController();
+       virtual ~FlowController(){
+         if (_ctx)
+           SSL_CTX_free(_ctx);
+       }
        //! Set FlowController Name
-       void setName(std::string name) {
+       virtual void setName(std::string name) {
                _name = name;
        }
        //! Get Flow Controller Name
-       std::string getName(void) {
+       virtual std::string getName(void) {
                return (_name);
        }
        //! Set UUID
-       void setUUID(uuid_t uuid) {
+       virtual void setUUID(uuid_t uuid) {
                uuid_copy(_uuid, uuid);
        }
        //! Get UUID
-       bool getUUID(uuid_t uuid) {
-               if (uuid)
-               {
+       virtual bool getUUID(uuid_t uuid) {
+               if (uuid) {
                        uuid_copy(uuid, _uuid);
                        return true;
-               }
-               else
+               } else
                        return false;
        }
        //! Set MAX TimerDrivenThreads
-       void setMaxTimerDrivenThreads(int number)
-       {
+       virtual void setMaxTimerDrivenThreads(int number) {
                _maxTimerDrivenThreads = number;
        }
        //! Get MAX TimerDrivenThreads
-       int getMaxTimerDrivenThreads()
-       {
+       virtual int getMaxTimerDrivenThreads() {
                return _maxTimerDrivenThreads;
        }
        //! Set MAX EventDrivenThreads
-       void setMaxEventDrivenThreads(int number)
-       {
+       virtual void setMaxEventDrivenThreads(int number) {
                _maxEventDrivenThreads = number;
        }
        //! Get MAX EventDrivenThreads
-       int getMaxEventDrivenThreads()
-       {
+       virtual int getMaxEventDrivenThreads() {
                return _maxEventDrivenThreads;
        }
        //! Get the provenance repository
-       ProvenanceRepository *getProvenanceRepository()
-       {
+       virtual ProvenanceRepository *getProvenanceRepository() {
                return this->_provenanceRepo;
        }
-       //! Life Cycle related function
-       //! Load flow YAML from disk, after that, create the root process group 
and its children, initialize the flows
-       void load();
+       //! Load flow xml from disk, after that, create the root process group 
and its children, initialize the flows
+       virtual void load() = 0;
+
        //! Whether the Flow Controller is start running
-       bool isRunning();
-       //! Whether the Flow Controller has already been initialized (loaded 
flow YAML)
-       bool isInitialized();
+       virtual bool isRunning() {
+               return _running.load();
+       }
+       //! Whether the Flow Controller has already been initialized (loaded 
flow XML)
+       virtual bool isInitialized() {
+               return _initialized.load();
+       }
        //! Start to run the Flow Controller which internally start the root 
process group and all its children
-       bool start();
-       //! Stop to run the Flow Controller which internally stop the root 
process group and all its children
-       void stop(bool force);
-       //! reload flow controller's configuration
-       void reload(std::string yamlFile);
+       virtual bool start() = 0;
        //! Unload the current flow YAML, clean the root process group and all 
its children
-       void unload();
+       virtual void stop(bool force) = 0;
+       //! Asynchronous function trigger unloading and wait for a period of 
time
+       virtual void waitUnload(const uint64_t timeToWaitMs) = 0;
+       //! Unload the current flow xml, clean the root process group and all 
its children
+       virtual void unload() = 0;
+       //! Load new xml
+       virtual void reload(std::string yamlFile) = 0;
        //! update property value
-       void updatePropertyValue(std::string processorName, std::string 
propertyName, std::string propertyValue)
-       {
+       void updatePropertyValue(std::string processorName,
+                       std::string propertyName, std::string propertyValue) {
                if (_root)
-                       _root->updatePropertyValue(processorName, propertyName, 
propertyValue);
+                       _root->updatePropertyValue(processorName, propertyName,
+                                       propertyValue);
        }
 
        //! Create Processor (Node/Input/Output Port) based on the name
-       Processor *createProcessor(std::string name, uuid_t uuid);
+       virtual Processor *createProcessor(std::string name, uuid_t uuid) = 0;
        //! Create Root Processor Group
-       ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid);
+       virtual ProcessGroup *createRootProcessGroup(std::string name, uuid_t 
uuid) = 0;
        //! Create Remote Processor Group
-       ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid);
+       virtual ProcessGroup *createRemoteProcessGroup(std::string name,
+                       uuid_t uuid) = 0;
        //! Create Connection
-       Connection *createConnection(std::string name, uuid_t uuid);
+       virtual Connection *createConnection(std::string name, uuid_t uuid) = 0;
        //! set 8 bytes SerialNumber
-       void setSerialNumber(uint8_t *number)
-       {
+       virtual void setSerialNumber(uint8_t *number) {
                _protocol->setSerialNumber(number);
        }
+
+       
        //! getSSLContext
-       SSL_CTX *getSSLContext()
+       virtual SSL_CTX *getSSLContext()
        {
                return _ctx;
        }
 
 protected:
+  
+       //! SSL context
+       SSL_CTX *_ctx;
 
        //! A global unique identifier
        uuid_t _uuid;
@@ -218,6 +220,10 @@ protected:
        int _maxEventDrivenThreads;
        //! Config
        //! FlowFile Repo
+       //! Whether it is running
+       std::atomic<bool> _running;
+       //! Whether it has already been initialized (load the flow XML already)
+       std::atomic<bool> _initialized;
        //! Provenance Repo
        ProvenanceRepository *_provenanceRepo;
        //! Flow Engines
@@ -231,50 +237,130 @@ protected:
        //! Heart Beat
        //! FlowControl Protocol
        FlowControlProtocol *_protocol;
-       //! SSL context
-       SSL_CTX *_ctx;
+       
+
+       FlowController() :
+                       _root(0), _maxTimerDrivenThreads(0), 
_maxEventDrivenThreads(0), _running(
+                                       false), _initialized(false), 
_provenanceRepo(0), _protocol(
+                                       0), _logger(Logger::getLogger()), 
_ctx(NULL){
+       }
 
 private:
 
+       //! Logger
+       Logger *_logger;
+
+};
+
+/**
+ * Flow Controller implementation that defines the typical flow.
+ * of events.
+ */
+class FlowControllerImpl: public FlowController {
+public:
+
+       //! Destructor
+       virtual ~FlowControllerImpl();
+
+       //! Life Cycle related function
+       //! Load flow xml from disk, after that, create the root process group 
and its children, initialize the flows
+       void load();
+       //! Start to run the Flow Controller which internally start the root 
process group and all its children
+       bool start();
+       //! Stop to run the Flow Controller which internally stop the root 
process group and all its children
+       void stop(bool force);
+       //! Asynchronous function trigger unloading and wait for a period of 
time
+       void waitUnload(const uint64_t timeToWaitMs);
+       //! Unload the current flow xml, clean the root process group and all 
its children
+       void unload();
+       //! Load new xml
+       void reload(std::string yamlFile);
+       //! update property value
+       void updatePropertyValue(std::string processorName,
+                       std::string propertyName, std::string propertyValue) {
+               if (_root)
+                       _root->updatePropertyValue(processorName, propertyName,
+                                       propertyValue);
+       }
+
+       //! Create Processor (Node/Input/Output Port) based on the name
+       Processor *createProcessor(std::string name, uuid_t uuid);
+       //! Create Root Processor Group
+       ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid);
+       //! Create Remote Processor Group
+       ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid);
+       //! Create Connection
+       Connection *createConnection(std::string name, uuid_t uuid);
+
+       //! Constructor
+       /*!
+        * Create a new Flow Controller
+        */
+       FlowControllerImpl(std::string name = DEFAULT_ROOT_GROUP_NAME);
+
+       
+       
+       
+       friend class FlowControlFactory;
+
+private:
+  
+       
+
        //! Mutex for protection
        std::mutex _mtx;
        //! Logger
        Logger *_logger;
-       //! Configure
        Configure *_configure;
-       //! Whether it is running
-       std::atomic<bool> _running;
-       //! Whether it has already been initialized (load the flow YAML already)
-       std::atomic<bool> _initialized;
 
        //! Process Processor Node YAML
        void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup 
*parent);
        //! Process Port YAML
-       void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, 
TransferDirection direction);
+       void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent,
+                       TransferDirection direction);
        //! Process Root Processor Group YAML
        void parseRootProcessGroupYaml(YAML::Node rootNode);
        //! Process Property YAML
-       void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, 
Processor *processor);
+       void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node,
+                       Processor *processor);
        //! Process connection YAML
        void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent);
        //! Process Remote Process Group YAML
        void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup 
*parent);
        //! Parse Properties Node YAML for a processor
-       void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor 
*processor);
-
-       static FlowController *_flowController;
-
-       //! Constructor
-       /*!
-        * Create a new Flow Controller
-        */
-       FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME);
+       void parsePropertiesNodeYaml(YAML::Node *propertiesNode,
+                       Processor *processor);
 
        // Prevent default copy constructor and assignment operation
        // Only support pass by reference or pointer
-       FlowController(const FlowController &parent);
-       FlowController &operator=(const FlowController &parent);
+       FlowControllerImpl(const FlowController &parent);
+       FlowControllerImpl &operator=(const FlowController &parent);
+
+};
 
+/**
+ * Flow Controller factory that creates flow controllers or gets the
+ * assigned instance.
+ */
+class FlowControllerFactory {
+public:
+       //! Get the singleton flow controller
+       static FlowController * getFlowController(FlowController *instance = 0) 
{
+               if (!_flowController) {
+                       if (NULL == instance)
+                               _flowController = createFlowController();
+                       else
+                               _flowController = instance;
+               }
+               return _flowController;
+       }
+
+       //! Get the singleton flow controller
+       static FlowController * createFlowController() {
+                return dynamic_cast<FlowController*>(new FlowControllerImpl());
+       }
+private:
+       static FlowController *_flowController;
 };
 
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Logger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Logger.h b/libminifi/include/Logger.h
index 621df78..38d6a97 100644
--- a/libminifi/include/Logger.h
+++ b/libminifi/include/Logger.h
@@ -21,6 +21,8 @@
 #ifndef __LOGGER_H__
 #define __LOGGER_H__
 
+#include <string>
+#include <algorithm>
 #include <cstdio>
 #include "spdlog/spdlog.h"
 
@@ -72,6 +74,36 @@ public:
                        return;
                _spdlog->set_level((spdlog::level::level_enum) level);
        }
+
+       void setLogLevel(const std::string &level,LOG_LEVEL_E defaultLevel = 
info )
+       {
+               std::string logLevel = "";
+               std::transform(level.begin(), level.end(), logLevel.end(), 
::tolower);
+
+               if (logLevel == "trace") {
+                       setLogLevel(trace);
+               } else if (logLevel == "debug") {
+                       setLogLevel(debug);
+               } else if (logLevel == "info") {
+                       setLogLevel(info);
+               } else if (logLevel == "notice") {
+                       setLogLevel(notice);
+               } else if (logLevel == "warn") {
+                       setLogLevel(warn);
+               } else if (logLevel == "error") {
+                       setLogLevel(err);
+               } else if (logLevel == "critical") {
+                       setLogLevel(critical);
+               } else if (logLevel == "alert") {
+                       setLogLevel(alert);
+               } else if (logLevel == "emerg") {
+                       setLogLevel(emerg);
+               } else if (logLevel == "off") {
+                       setLogLevel(off);
+               } else {
+                       setLogLevel(defaultLevel);
+               }
+       }
        //! Destructor
        ~Logger() {}
        /**

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h
index 35bf040..a0df577 100644
--- a/libminifi/include/Processor.h
+++ b/libminifi/include/Processor.h
@@ -343,7 +343,7 @@ private:
        //! Incoming connection Iterator
        std::set<Connection *>::iterator _incomingConnectionsIter;
        //! Condition for whether there is incoming work to do
-       bool _hasWork = false;
+       std::atomic<bool> _hasWork;
        //! Concurrent condition mutex for whether there is incoming work to do
        std::mutex _workAvailableMtx;
        //! Concurrent condition variable for whether there is incoming work to 
do

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h
index f3e814f..7507c03 100644
--- a/libminifi/include/Provenance.h
+++ b/libminifi/include/Provenance.h
@@ -20,33 +20,30 @@
 #ifndef __PROVENANCE_H__
 #define __PROVENANCE_H__
 
-#include <stdio.h>
-#include <errno.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/stat.h>
+#include <ftw.h>
 #include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
 #include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <map>
 #include <set>
-#include <cassert>
-#include <errno.h>
-#include <chrono>
+#include <string>
 #include <thread>
-#include <ftw.h>
-#include "leveldb/db.h"
+#include <vector>
 
-#include "TimeUtil.h"
-#include "Logger.h"
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/slice.h"
+#include "leveldb/status.h"
 #include "Configure.h"
-#include "Property.h"
-#include "ResourceClaim.h"
-#include "Relationship.h"
 #include "Connection.h"
 #include "FlowFileRecord.h"
+#include "Logger.h"
+#include "Property.h"
+#include "ResourceClaim.h"
+#include "TimeUtil.h"
+#include "Serializable.h"
 
 // Provenance Event Record Serialization Seg Size
 #define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
@@ -54,7 +51,7 @@
 class ProvenanceRepository;
 
 //! Provenance Event Record
-class ProvenanceEventRecord
+class ProvenanceEventRecord : protected Serializable
 {
 public:
        enum ProvenanceEventType {
@@ -176,17 +173,11 @@ public:
                uuid_generate(_eventId);
                uuid_unparse(_eventId, eventIdStr);
                _eventIdStr = eventIdStr;
-               _serializedBuf = NULL;
-               _serializeBufSize = 0;
-               _maxSerializeBufSize = 0;
                _logger = Logger::getLogger();
        }
 
        ProvenanceEventRecord() {
                        _eventTime = getTimeMillis();
-                       _serializedBuf = NULL;
-                       _serializeBufSize = 0;
-                       _maxSerializeBufSize = 0;
                        _logger = Logger::getLogger();
        }
 
@@ -399,7 +390,12 @@ public:
        //! Serialize and Persistent to the repository
        bool Serialize(ProvenanceRepository *repo);
        //! DeSerialize
-       bool DeSerialize(uint8_t *buffer, int bufferSize);
+       bool DeSerialize(const uint8_t *buffer, const int bufferSize);
+       //! DeSerialize
+       bool DeSerialize(DataStream &stream)
+       {
+               return DeSerialize(stream.getBuffer(),stream.getSize());
+       }
        //! DeSerialize
        bool DeSerialize(ProvenanceRepository *repo, std::string key);
 
@@ -456,151 +452,7 @@ private:
 
        //! Logger
        Logger *_logger;
-       // All serialization related method and internal buf
-       uint8_t *_serializedBuf;
-       int _serializeBufSize;
-       int _maxSerializeBufSize;
-       int writeData(uint8_t *value, int size)
-       {
-               if ((_serializeBufSize + size) > _maxSerializeBufSize)
-               {
-                       // if write exceed
-                       uint8_t *buffer = new uint8_t[_maxSerializeBufSize + 
PROVENANCE_EVENT_RECORD_SEG_SIZE];
-                       if (!buffer)
-                       {
-                               return -1;
-                       }
-                       memcpy(buffer, _serializedBuf, _serializeBufSize);
-                       delete[] _serializedBuf;
-                       _serializedBuf = buffer;
-                       _maxSerializeBufSize = _maxSerializeBufSize + 
PROVENANCE_EVENT_RECORD_SEG_SIZE;
-               }
-               uint8_t *bufPtr = _serializedBuf + _serializeBufSize;
-               memcpy(bufPtr, value, size);
-               _serializeBufSize += size;
-               return size;
-       }
-       int readData(uint8_t *buf, int buflen)
-       {
-               if ((buflen + _serializeBufSize) > _maxSerializeBufSize)
-               {
-                       // if read exceed
-                       return -1;
-               }
-               uint8_t *bufPtr = _serializedBuf + _serializeBufSize;
-               memcpy(buf, bufPtr, buflen);
-               _serializeBufSize += buflen;
-               return buflen;
-       }
-       int write(uint8_t value)
-       {
-               return writeData(&value, 1);
-       }
-       int write(char value)
-       {
-               return writeData((uint8_t *)&value, 1);
-       }
-       int write(uint32_t value)
-       {
-               uint8_t temp[4];
-
-               temp[0] = (value & 0xFF000000) >> 24;
-               temp[1] = (value & 0x00FF0000) >> 16;
-               temp[2] = (value & 0x0000FF00) >> 8;
-               temp[3] = (value & 0x000000FF);
-               return writeData(temp, 4);
-       }
-       int write(uint16_t value)
-       {
-               uint8_t temp[2];
-               temp[0] = (value & 0xFF00) >> 8;
-               temp[1] = (value & 0xFF);
-               return writeData(temp, 2);
-       }
-       int write(uint8_t *value, int len)
-       {
-               return writeData(value, len);
-       }
-       int write(uint64_t value)
-       {
-               uint8_t temp[8];
-
-               temp[0] = (value >> 56) & 0xFF;
-               temp[1] = (value >> 48) & 0xFF;
-               temp[2] = (value >> 40) & 0xFF;
-               temp[3] = (value >> 32) & 0xFF;
-               temp[4] = (value >> 24) & 0xFF;
-               temp[5] = (value >> 16) & 0xFF;
-               temp[6] = (value >>  8) & 0xFF;
-               temp[7] = (value >>  0) & 0xFF;
-               return writeData(temp, 8);
-       }
-       int write(bool value)
-       {
-               uint8_t temp = value;
-               return write(temp);
-       }
-       int writeUTF(std::string str, bool widen = false);
-       int read(uint8_t &value)
-       {
-               uint8_t buf;
-
-               int ret = readData(&buf, 1);
-               if (ret == 1)
-                       value = buf;
-               return ret;
-       }
-       int read(uint16_t &value)
-       {
-               uint8_t buf[2];
-
-               int ret = readData(buf, 2);
-               if (ret == 2)
-                       value = (buf[0] << 8) | buf[1];
-               return ret;
-       }
-       int read(char &value)
-       {
-               uint8_t buf;
-
-               int ret = readData(&buf, 1);
-               if (ret == 1)
-                       value = (char) buf;
-               return ret;
-       }
-       int read(uint8_t *value, int len)
-       {
-               return readData(value, len);
-       }
-       int read(uint32_t &value)
-       {
-               uint8_t buf[4];
-
-               int ret = readData(buf, 4);
-               if (ret == 4)
-                       value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) 
| buf[3];
-               return ret;
-       }
-       int read(uint64_t &value)
-       {
-               uint8_t buf[8];
-
-               int ret = readData(buf, 8);
-               if (ret == 8)
-               {
-                       value = ((uint64_t) buf[0] << 56) |
-                                       ((uint64_t) (buf[1] & 255) << 48) |
-                                       ((uint64_t) (buf[2] & 255) << 40) |
-                                       ((uint64_t) (buf[3] & 255) << 32) |
-                                       ((uint64_t) (buf[4] & 255) << 24) |
-                                       ((uint64_t) (buf[5] & 255) << 16) |
-                                       ((uint64_t) (buf[6] & 255) <<  8) |
-                                       ((uint64_t) (buf[7] & 255) <<  0);
-               }
-               return ret;
-       }
-       int readUTF(std::string &str, bool widen = false);
-
+       
        // Prevent default copy constructor and assignment operation
        // Only support pass by reference or pointer
        ProvenanceEventRecord(const ProvenanceEventRecord &parent);
@@ -649,10 +501,9 @@ public:
        //! clear
        void clear()
        {
-               for (std::set<ProvenanceEventRecord*>::iterator it = 
_events.begin(); it != _events.end(); ++it)
+               for (auto it : _events)
                {
-                       ProvenanceEventRecord *event = (ProvenanceEventRecord 
*) (*it);
-                       delete event;
+                       delete it;
                }
                _events.clear();
        }
@@ -733,6 +584,7 @@ public:
                _purgePeriod = PROVENANCE_PURGE_PERIOD;
                _maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE;
                _db = NULL;
+               _thread = NULL;
                _running = false;
                _repoFull = false;
        }
@@ -746,7 +598,7 @@ public:
        }
 
        //! initialize
-       bool initialize()
+       virtual bool initialize()
        {
                std::string value;
                if 
(_configure->get(Configure::nifi_provenance_repository_directory_default, 
value))
@@ -786,7 +638,7 @@ public:
                return true;
        }
        //! Put
-       bool Put(std::string key, uint8_t *buf, int bufLen)
+       virtual bool Put(std::string key, uint8_t *buf, int bufLen)
        {
                // persistent to the DB
                leveldb::Slice value((const char *) buf, bufLen);
@@ -798,7 +650,7 @@ public:
                        return false;
        }
        //! Delete
-       bool Delete(std::string key)
+       virtual bool Delete(std::string key)
        {
                leveldb::Status status;
                status = _db->Delete(leveldb::WriteOptions(), key);
@@ -808,7 +660,7 @@ public:
                        return false;
        }
        //! Get
-       bool Get(std::string key, std::string &value)
+       virtual bool Get(std::string key, std::string &value)
        {
                leveldb::Status status;
                status = _db->Get(leveldb::ReadOptions(), key, &value);
@@ -839,11 +691,11 @@ public:
        //! Run function for the thread
        static void run(ProvenanceRepository *repo);
        //! Start the repository monitor thread
-       void start();
+       virtual void start();
        //! Stop the repository monitor thread
-       void stop();
+       virtual void stop();
        //! whether the repo is full
-       bool isFull()
+       virtual bool isFull()
        {
                return _repoFull;
        }
@@ -859,8 +711,8 @@ private:
        //! Logger
        Logger *_logger;
        //! Configure
-       Configure *_configure;
        //! max db entry life time
+       Configure *_configure;
        int64_t _maxPartitionMillis;
        //! max db size
        int64_t _maxPartitionBytes;
@@ -899,4 +751,5 @@ private:
 
 
 
+
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h 
b/libminifi/include/ResourceClaim.h
index 098b259..b4085f2 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -32,15 +32,19 @@
 //! Default content directory
 #define DEFAULT_CONTENT_DIRECTORY "./content_repository"
 
+
+
 //! ResourceClaim Class
 class ResourceClaim {
 
 public:
+  
+       static std::string default_directory_path;
        //! Constructor
        /*!
         * Create a new resource claim
         */
-       ResourceClaim(const std::string contentDirectory);
+       ResourceClaim(const std::string contentDirectory = 
default_directory_path);
        //! Destructor
        virtual ~ResourceClaim() {}
        //! increaseFlowFileRecordOwnedCount

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Serializable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Serializable.h b/libminifi/include/Serializable.h
new file mode 100644
index 0000000..c32dee0
--- /dev/null
+++ b/libminifi/include/Serializable.h
@@ -0,0 +1,294 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SERIALIZABLE_H__
+#define __SERIALIZABLE_H__
+
+#include <cstdint>
+#include <vector>
+#include <string>
+
+
+/**
+ * Mechanism to determine endianness of host.
+ * Accounts for only BIG/LITTLE/BIENDIAN
+ **/
+class EndiannessCheck
+{
+public:
+    static bool IS_LITTLE;
+private:
+
+    static bool is_little_endian() {
+        /* do whatever is needed at static init time */
+        unsigned int x = 1;
+        char *c = (char*) &x;
+        IS_LITTLE=*c==1;
+        return IS_LITTLE;
+    }
+};
+
+
+/**
+ * DataStream defines the mechanism through which
+ * binary data will be written to a sink
+ */
+class DataStream
+{
+public:
+
+    DataStream() : readBuffer(0)
+    {
+
+    }
+
+    /**
+     * Constructor
+     **/
+    DataStream(const uint8_t *buf, const uint32_t buflen) : DataStream()
+    {
+        writeData((uint8_t*)buf,buflen);
+
+    }
+
+       /**
+        * Reads data and places it into buf
+        * @param buf buffer in which we extract data
+        * @param buflen
+        */
+       int readData(std::vector<uint8_t> &buf, int buflen);
+       /**
+        * Reads data and places it into buf
+        * @param buf buffer in which we extract data
+        * @param buflen
+        */
+    int readData(uint8_t *buf, int buflen);
+
+    /**
+     * writes valiue to buffer
+     * @param value value to write
+     * @param size size of value
+     */
+    int writeData(uint8_t *value, int size);
+
+
+    /**
+     * Reads a system word
+     * @param value value to write
+     */
+    inline int readLongLong(uint64_t &value, bool is_little_endian =
+                                EndiannessCheck::IS_LITTLE);
+
+
+    /**
+     * Reads a uint32_t
+     * @param value value to write
+     */
+    inline int readLong(uint32_t &value, bool is_little_endian =
+                            EndiannessCheck::IS_LITTLE);
+
+
+    /**
+     * Reads a system short
+     * @param value value to write
+     */
+    inline int readShort(uint16_t &value,bool is_little_endian =
+                             EndiannessCheck::IS_LITTLE);
+
+
+    /**
+     * Returns the underlying buffer
+     * @return vector's array
+     **/
+    const uint8_t *getBuffer() const
+    {
+        return &buffer[0];
+    }
+
+    /**
+     * Retrieve size of data stream
+     * @return size of data stream
+     **/
+    const uint32_t getSize() const
+    {
+        return buffer.size();
+    }
+
+private:
+    // All serialization related method and internal buf
+    std::vector<uint8_t> buffer;
+    uint32_t readBuffer;
+};
+
+/**
+ * Serializable instances provide base functionality to
+ * write certain objects/primitives to a data stream.
+ *
+ */
+class Serializable {
+
+public:
+
+    /**
+     * Inline function to write T to stream
+     **/
+    template<typename T>
+    inline int writeData(const T &t,DataStream *stream);
+
+    /**
+     * Inline function to write T to to_vec
+     **/
+    template<typename T>
+    inline int writeData(const T &t, uint8_t *to_vec);
+
+    /**
+     * Inline function to write T to to_vec
+     **/
+    template<typename T>
+    inline int writeData(const T &t, std::vector<uint8_t> &to_vec);
+
+
+    /**
+     * write byte to stream
+     * @return resulting write size
+     **/
+    int write(uint8_t value,DataStream *stream);
+
+    /**
+     * write byte to stream
+     * @return resulting write size
+     **/
+    int write(char value,DataStream *stream);
+
+    /**
+     * write 4 bytes to stream
+     * @param base_value non encoded value
+     * @param stream output stream
+     * @param is_little_endian endianness determination
+     * @return resulting write size
+     **/
+    int write(uint32_t base_value,DataStream *stream, bool is_little_endian =
+                  EndiannessCheck::IS_LITTLE);
+
+    /**
+     * write 2 bytes to stream
+     * @param base_value non encoded value
+     * @param stream output stream
+     * @param is_little_endian endianness determination
+     * @return resulting write size
+     **/
+    int write(uint16_t base_value,DataStream *stream, bool is_little_endian =
+                  EndiannessCheck::IS_LITTLE);
+
+    /**
+     * write valueto stream
+     * @param value non encoded value
+     * @param len length of value
+     * @param strema output stream
+     * @return resulting write size
+     **/
+    int write(uint8_t *value, int len,DataStream *stream);
+
+    /**
+     * write 8 bytes to stream
+     * @param base_value non encoded value
+     * @param stream output stream
+     * @param is_little_endian endianness determination
+     * @return resulting write size
+     **/
+    int write(uint64_t base_value,DataStream *stream, bool is_little_endian =
+                  EndiannessCheck::IS_LITTLE);
+
+    /**
+    * write bool to stream
+     * @param value non encoded value
+     * @return resulting write size
+     **/
+    int write(bool value);
+
+    /**
+     * write UTF string to stream
+     * @param str string to write
+     * @return resulting write size
+     **/
+    int writeUTF(std::string str,DataStream *stream, bool widen = false);
+
+    /**
+     * reads a byte from the stream
+     * @param value reference in which will set the result
+     * @param stream stream from which we will read
+     * @return resulting read size
+     **/
+    int read(uint8_t &value,DataStream *stream);
+
+    /**
+     * reads two bytes from the stream
+     * @param value reference in which will set the result
+     * @param stream stream from which we will read
+     * @return resulting read size
+     **/
+    int read(uint16_t &base_value,DataStream *stream, bool is_little_endian =
+                 EndiannessCheck::IS_LITTLE);
+
+    /**
+     * reads a byte from the stream
+     * @param value reference in which will set the result
+     * @param stream stream from which we will read
+     * @return resulting read size
+     **/
+    int read(char &value,DataStream *stream);
+
+    /**
+     * reads a byte array from the stream
+     * @param value reference in which will set the result
+     * @param len length to read
+     * @param stream stream from which we will read
+     * @return resulting read size
+     **/
+    int read(uint8_t *value, int len,DataStream *stream);
+
+    /**
+     * reads four bytes from the stream
+     * @param value reference in which will set the result
+     * @param stream stream from which we will read
+     * @return resulting read size
+     **/
+    int read(uint32_t &value,DataStream *stream,
+             bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+    /**
+     * reads eight byte from the stream
+     * @param value reference in which will set the result
+     * @param stream stream from which we will read
+     * @return resulting read size
+     **/
+    int read(uint64_t &value,DataStream *stream,
+             bool is_little_endian = EndiannessCheck::IS_LITTLE);
+
+    /**
+     * read UTF from stream
+     * @param str reference string
+     * @param stream stream from which we will read
+     * @return resulting read size
+     **/
+    int readUTF(std::string &str,DataStream *stream, bool widen = false);
+
+protected:
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 2652e35..61e782b 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -23,6 +23,8 @@ Configure *Configure::_configure(NULL);
 const char *Configure::nifi_flow_configuration_file = 
"nifi.flow.configuration.file";
 const char *Configure::nifi_administrative_yield_duration = 
"nifi.administrative.yield.duration";
 const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
+const char *Configure::nifi_graceful_shutdown_seconds  = 
"nifi.graceful.shutdown.seconds";
+const char *Configure::nifi_log_level = "nifi.log.level";
 const char *Configure::nifi_server_name = "nifi.server.name";
 const char *Configure::nifi_server_port = "nifi.server.port";
 const char *Configure::nifi_server_report_interval= 
"nifi.server.report.interval";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp 
b/libminifi/src/FlowControlProtocol.cpp
index fee1a3b..a74e32e 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -346,27 +346,6 @@ int FlowControlProtocol::sendRegisterReq()
                                _logger->log_info("Flow Control Protocol 
receive report interval %d ms", reportInterval);
                                this->_reportInterval = reportInterval;
                        }
-                       else if (((FlowControlMsgID) msgID) == FLOW_YML_CONTENT)
-                       {
-                               uint32_t yamlLen;
-                               payloadPtr = this->decode(payloadPtr, yamlLen);
-                               _logger->log_info("Flow Control Protocol 
receive YAML content length %d", yamlLen);
-                               time_t rawtime;
-                               struct tm *timeinfo;
-                               time(&rawtime);
-                               timeinfo = localtime(&rawtime);
-                               std::string yamlFileName = "flow.";
-                               yamlFileName += asctime(timeinfo);
-                               yamlFileName += ".yml";
-                               std::ofstream fs;
-                               fs.open(yamlFileName.c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
-                               if (fs.is_open())
-                               {
-                                       fs.write((const char *)payloadPtr, 
yamlLen);
-                                       fs.close();
-                                       
this->_controller->reload(yamlFileName.c_str());
-                               }
-                       }
                        else
                        {
                                break;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 4bbc234..b6dc4e9 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -28,60 +28,52 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
-
+#include <future>
 #include "FlowController.h"
 #include "ProcessContext.h"
 
-FlowController *FlowController::_flowController(NULL);
-FlowController::FlowController(std::string name)
-: _name(name)
-{
-    uuid_generate(_uuid);
+FlowController *FlowControllerFactory::_flowController(NULL);
 
-    // Setup the default values
-    _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
-    _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
-    _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
-    _running = false;
-    _initialized = false;
-    _root = NULL;
-    _logger = Logger::getLogger();
-    _protocol = new FlowControlProtocol(this);
+FlowControllerImpl::FlowControllerImpl(std::string name)  {
+       uuid_generate(_uuid);
 
-    // NiFi config properties
-    _configure = Configure::getConfigure();
+       _name = name;
+       // Setup the default values
+       _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
+       _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
+       _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
+       _running = false;
+       _initialized = false;
+       _root = NULL;
+       _logger = Logger::getLogger();
+       _protocol = new FlowControlProtocol(this);
 
-    std::string rawConfigFileString;
-    _configure->get(Configure::nifi_flow_configuration_file, 
rawConfigFileString);
+       // NiFi config properties
+       _configure = Configure::getConfigure();
 
-       if (!rawConfigFileString.empty())
-       {
-        _configurationFileName = rawConfigFileString;
-    }
+       std::string rawConfigFileString;
+       _configure->get(Configure::nifi_flow_configuration_file,
+                       rawConfigFileString);
 
-    char *path = NULL;
-    char full_path[PATH_MAX];
+       if (!rawConfigFileString.empty()) {
+               _configurationFileName = rawConfigFileString;
+       }
 
-    std::string adjustedFilename;
-       if (!_configurationFileName.empty())
-       {
-        // perform a naive determination if this is a relative path
-               if (_configurationFileName.c_str()[0] != '/')
-               {
-            adjustedFilename = adjustedFilename + _configure->getHome() + "/" 
+ _configurationFileName;
+       char *path = NULL;
+       char full_path[PATH_MAX];
+
+       std::string adjustedFilename;
+       if (!_configurationFileName.empty()) {
+               // perform a naive determination if this is a relative path
+               if (_configurationFileName.c_str()[0] != '/') {
+                       adjustedFilename = adjustedFilename + 
_configure->getHome() + "/"
+                                       + _configurationFileName;
+               } else {
+                       adjustedFilename = _configurationFileName;
                }
-               else
-               {
-            adjustedFilename = _configurationFileName;
-        }
-    }
+       }
 
-    path = realpath(adjustedFilename.c_str(), full_path);
-       if (!path)
-       {
-        _logger->log_error("Could not locate path from provided configuration 
file name (%s).  Exiting.", full_path);
-        exit(1);
-    }
+       path = realpath(adjustedFilename.c_str(), full_path);
 
        std::string pathString(path);
        _configurationFileName = pathString;
@@ -90,14 +82,14 @@ FlowController::FlowController(std::string name)
        // Create the content repo directory if needed
        struct stat contentDirStat;
 
-       if (stat(DEFAULT_CONTENT_DIRECTORY, &contentDirStat) != -1 && 
S_ISDIR(contentDirStat.st_mode))
+       if (stat(ResourceClaim::default_directory_path.c_str(), 
&contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode))
        {
-               path = realpath(DEFAULT_CONTENT_DIRECTORY, full_path);
+               path = realpath(ResourceClaim::default_directory_path.c_str(), 
full_path);
                _logger->log_info("FlowController content directory %s", 
full_path);
        }
        else
        {
-          if (mkdir(DEFAULT_CONTENT_DIRECTORY, 0777) == -1)
+          if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1)
           {
                   _logger->log_error("FlowController content directory 
creation failed");
                   exit(1);
@@ -184,529 +176,614 @@ FlowController::FlowController(std::string name)
 
                        _logger->log_info("Load/Verify Client Certificate OK.");
                }
+               
+       }
+       if (!path) {
+               _logger->log_error(
+                               "Could not locate path from provided 
configuration file name (%s).  Exiting.",
+                               full_path);
+               exit(1);
        }
 
-    // Create repos for flow record and provenance
-    _provenanceRepo = new ProvenanceRepository();
-    _provenanceRepo->initialize();
 
-    _logger->log_info("FlowController %s created", _name.c_str());
+       // Create repos for flow record and provenance
+       _provenanceRepo = new ProvenanceRepository();
+       _provenanceRepo->initialize();
 }
 
-FlowController::~FlowController()
-{
-    stop(true);
-    unload();
-    delete _protocol;
-    delete _provenanceRepo;
-    if (_ctx)
-       SSL_CTX_free(_ctx);
-}
+FlowControllerImpl::~FlowControllerImpl() {
 
-bool FlowController::isRunning()
-{
-    return (_running);
+       stop(true);
+       unload();
+       if (NULL != _protocol)
+               delete _protocol;
+       if (NULL != _provenanceRepo)
+               delete _provenanceRepo;
+       
 }
 
-bool FlowController::isInitialized()
-{
-    return (_initialized);
-}
+void FlowControllerImpl::stop(bool force) {
 
-void FlowController::stop(bool force)
-{
-       if (_running)
-       {
-        _logger->log_info("Stop Flow Controller");
-        this->_timerScheduler.stop();
-        this->_eventScheduler.stop();
-        // Wait for sometime for thread stop
-        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-        if (this->_root)
-            this->_root->stopProcessing(
-                    &this->_timerScheduler,
-                    &this->_eventScheduler);
-        _running = false;
-    }
-}
+       if (_running) {
+               // immediately indicate that we are not running
+               _running = false;
 
-void FlowController::unload()
-{
-       if (_running)
-       {
-        stop(true);
-    }
-       if (_initialized)
-       {
-        _logger->log_info("Unload Flow Controller");
-        if (_root)
-            delete _root;
-        _root = NULL;
-        _initialized = false;
-        _name = "";
-    }
+               _logger->log_info("Stop Flow Controller");
+               this->_timerScheduler.stop();
+               this->_eventScheduler.stop();
+               // Wait for sometime for thread stop
+               std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+               if (this->_root)
+                       this->_root->stopProcessing(&this->_timerScheduler,
+                                       &this->_eventScheduler);
 
-    return;
+       }
 }
 
+/**
+ * This function will attempt to unload yaml and stop running Processors.
+ *
+ * If the latter attempt fails or does not complete within the prescribed
+ * period, _running will be set to false and we will return.
+ *
+ * @param timeToWaitMs Maximum time to wait before manually
+ * marking running as false.
+ */
+void FlowControllerImpl::waitUnload(const uint64_t timeToWaitMs) {
+       if (_running) {
+               // use the current time and increment with the provided 
argument.
+               std::chrono::system_clock::time_point wait_time =
+                               std::chrono::system_clock::now()
+                                               + 
std::chrono::milliseconds(timeToWaitMs);
+
+               // create an asynchronous future.
+               std::future<void> unload_task = std::async(std::launch::async,
+                               [this]() {unload();});
+
+               if (std::future_status::ready == 
unload_task.wait_until(wait_time)) {
+                       _running = false;
+               }
 
-Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
-{
-    Processor *processor = NULL;
-       if (name == GenerateFlowFile::ProcessorName)
-       {
-        processor = new GenerateFlowFile(name, uuid);
-       }
-       else if (name == LogAttribute::ProcessorName)
-       {
-        processor = new LogAttribute(name, uuid);
-       }
-       else if (name == RealTimeDataCollector::ProcessorName)
-       {
-        processor = new RealTimeDataCollector(name, uuid);
-       }
-       else if (name == GetFile::ProcessorName)
-       {
-        processor = new GetFile(name, uuid);
-       }
-       else if (name == PutFile::ProcessorName)
-       {
-        processor = new PutFile(name, uuid);
        }
-       else if (name == TailFile::ProcessorName)
-       {
-        processor = new TailFile(name, uuid);
-       }
-       else if (name == ListenSyslog::ProcessorName)
-       {
-        processor = new ListenSyslog(name, uuid);
+}
+
+
+void FlowControllerImpl::unload() {
+       if (_running) {
+               stop(true);
        }
-       else if (name == ExecuteProcess::ProcessorName)
-       {
-        processor = new ExecuteProcess(name, uuid);
+       if (_initialized) {
+               _logger->log_info("Unload Flow Controller");
+               if (_root)
+                       delete _root;
+               _root = NULL;
+               _initialized = false;
+               _name = "";
        }
-       else if (name == AppendHostInfo::ProcessorName)
-       {
-        processor = new AppendHostInfo(name, uuid);
+
+       return;
+}
+
+Processor *FlowControllerImpl::createProcessor(std::string name, uuid_t uuid) {
+       Processor *processor = NULL;
+       if (name == GenerateFlowFile::ProcessorName) {
+               processor = new GenerateFlowFile(name, uuid);
+       } else if (name == LogAttribute::ProcessorName) {
+               processor = new LogAttribute(name, uuid);
+       } else if (name == RealTimeDataCollector::ProcessorName) {
+               processor = new RealTimeDataCollector(name, uuid);
+       } else if (name == GetFile::ProcessorName) {
+               processor = new GetFile(name, uuid);
+       } else if (name == PutFile::ProcessorName) {
+               processor = new PutFile(name, uuid);
+       } else if (name == TailFile::ProcessorName) {
+               processor = new TailFile(name, uuid);
+       } else if (name == ListenSyslog::ProcessorName) {
+               processor = new ListenSyslog(name, uuid);
+       } else if (name == ExecuteProcess::ProcessorName) {
+               processor = new ExecuteProcess(name, uuid);
+       } else if (name == AppendHostInfo::ProcessorName) {
+               processor = new AppendHostInfo(name, uuid);
+       } else {
+               _logger->log_error("No Processor defined for %s", name.c_str());
+               return NULL;
        }
-       else
-       {
-        _logger->log_error("No Processor defined for %s", name.c_str());
-        return NULL;
-    }
 
-    //! initialize the processor
-    processor->initialize();
+       //! initialize the processor
+       processor->initialize();
 
-    return processor;
+       return processor;
 }
 
-ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t 
uuid)
-{
-    return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
+ProcessGroup *FlowControllerImpl::createRootProcessGroup(std::string name,
+               uuid_t uuid) {
+       return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
 }
 
-ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, 
uuid_t uuid)
-{
-    return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
+ProcessGroup *FlowControllerImpl::createRemoteProcessGroup(std::string name,
+               uuid_t uuid) {
+       return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
 }
 
-Connection *FlowController::createConnection(std::string name, uuid_t uuid)
-{
-    return new Connection(name, uuid);
+Connection *FlowControllerImpl::createConnection(std::string name,
+               uuid_t uuid) {
+       return new Connection(name, uuid);
 }
 
+void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
+       uuid_t uuid;
+       ProcessGroup *group = NULL;
 
+       // generate the random UIID
+       uuid_generate(uuid);
 
-void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
-    uuid_t uuid;
-    ProcessGroup *group = NULL;
+       std::string flowName = rootFlowNode["name"].as<std::string>();
 
-    // generate the random UIID
-    uuid_generate(uuid);
+       char uuidStr[37];
+       uuid_unparse(_uuid, uuidStr);
+       _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
+       _logger->log_debug("parseRootProcessGroup: name => [%s]", 
flowName.c_str());
+       group = this->createRootProcessGroup(flowName, uuid);
+       this->_root = group;
+       this->_name = flowName;
+}
 
-    std::string flowName = rootFlowNode["name"].as<std::string>();
+void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
+               ProcessGroup *parentGroup) {
+       int64_t schedulingPeriod = -1;
+       int64_t penalizationPeriod = -1;
+       int64_t yieldPeriod = -1;
+       int64_t runDurationNanos = -1;
+       uuid_t uuid;
+       Processor *processor = NULL;
+
+       if (!parentGroup) {
+               _logger->log_error("parseProcessNodeYaml: no parent group 
exists");
+               return;
+       }
 
-    char uuidStr[37];
-    uuid_unparse(_uuid, uuidStr);
-    _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
-    _logger->log_debug("parseRootProcessGroup: name => [%s]", 
flowName.c_str());
-    group = this->createRootProcessGroup(flowName, uuid);
-    this->_root = group;
-    this->_name = flowName;
-}
+       if (processorsNode) {
+
+               if (processorsNode.IsSequence()) {
+                       // Evaluate sequence of processors
+                       int numProcessors = processorsNode.size();
+
+                       for (YAML::const_iterator iter = processorsNode.begin();
+                                       iter != processorsNode.end(); ++iter) {
+                               ProcessorConfig procCfg;
+                               YAML::Node procNode = iter->as<YAML::Node>();
+
+                               procCfg.name = 
procNode["name"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: name => 
[%s]",
+                                               procCfg.name.c_str());
+                               procCfg.javaClass = 
procNode["class"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: class 
=> [%s]",
+                                               procCfg.javaClass.c_str());
+
+                               char uuidStr[37];
+                               uuid_unparse(_uuid, uuidStr);
+
+                               // generate the random UUID
+                               uuid_generate(uuid);
+
+                               // Determine the processor name only from the 
Java class
+                               int lastOfIdx = 
procCfg.javaClass.find_last_of(".");
+                               if (lastOfIdx != std::string::npos) {
+                                       lastOfIdx++; // if a value is found, 
increment to move beyond the .
+                                       int nameLength = 
procCfg.javaClass.length() - lastOfIdx;
+                                       std::string processorName = 
procCfg.javaClass.substr(
+                                                       lastOfIdx, nameLength);
+                                       processor = 
this->createProcessor(processorName, uuid);
+                               }
 
-void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, 
ProcessGroup *parentGroup) {
-    int64_t schedulingPeriod = -1;
-    int64_t penalizationPeriod = -1;
-    int64_t yieldPeriod = -1;
-    int64_t runDurationNanos = -1;
-    uuid_t uuid;
-    Processor *processor = NULL;
-
-    if (!parentGroup) {
-        _logger->log_error("parseProcessNodeYaml: no parent group exists");
-        return;
-    }
+                               if (!processor) {
+                                       _logger->log_error(
+                                                       "Could not create a 
processor %s with name %s",
+                                                       procCfg.name.c_str(), 
uuidStr);
+                                       throw std::invalid_argument(
+                                                       "Could not create 
processor " + procCfg.name);
+                               }
+                               processor->setName(procCfg.name);
+
+                               procCfg.maxConcurrentTasks =
+                                               procNode["max concurrent 
tasks"].as<std::string>();
+                               _logger->log_debug(
+                                               "parseProcessorNode: max 
concurrent tasks => [%s]",
+                                               
procCfg.maxConcurrentTasks.c_str());
+                               procCfg.schedulingStrategy = 
procNode["scheduling strategy"].as<
+                                               std::string>();
+                               _logger->log_debug(
+                                               "parseProcessorNode: scheduling 
strategy => [%s]",
+                                               
procCfg.schedulingStrategy.c_str());
+                               procCfg.schedulingPeriod = procNode["scheduling 
period"].as<
+                                               std::string>();
+                               _logger->log_debug(
+                                               "parseProcessorNode: scheduling 
period => [%s]",
+                                               
procCfg.schedulingPeriod.c_str());
+                               procCfg.penalizationPeriod = 
procNode["penalization period"].as<
+                                               std::string>();
+                               _logger->log_debug(
+                                               "parseProcessorNode: 
penalization period => [%s]",
+                                               
procCfg.penalizationPeriod.c_str());
+                               procCfg.yieldPeriod =
+                                               procNode["yield 
period"].as<std::string>();
+                               _logger->log_debug("parseProcessorNode: yield 
period => [%s]",
+                                               procCfg.yieldPeriod.c_str());
+                               procCfg.yieldPeriod = procNode["run duration 
nanos"].as<
+                                               std::string>();
+                               _logger->log_debug(
+                                               "parseProcessorNode: run 
duration nanos => [%s]",
+                                               
procCfg.runDurationNanos.c_str());
+
+                               // handle auto-terminated relationships
+                               YAML::Node autoTerminatedSequence =
+                                               procNode["auto-terminated 
relationships list"];
+                               std::vector<std::string> 
rawAutoTerminatedRelationshipValues;
+                               if (autoTerminatedSequence.IsSequence()
+                                               && 
!autoTerminatedSequence.IsNull()
+                                               && 
autoTerminatedSequence.size() > 0) {
+                                       for (YAML::const_iterator relIter =
+                                                       
autoTerminatedSequence.begin();
+                                                       relIter != 
autoTerminatedSequence.end();
+                                                       ++relIter) {
+                                               std::string autoTerminatedRel =
+                                                               
relIter->as<std::string>();
+                                               
rawAutoTerminatedRelationshipValues.push_back(
+                                                               
autoTerminatedRel);
+                                       }
+                               }
+                               procCfg.autoTerminatedRelationships =
+                                               
rawAutoTerminatedRelationshipValues;
+
+                               // handle processor properties
+                               YAML::Node propertiesNode = 
procNode["Properties"];
+                               parsePropertiesNodeYaml(&propertiesNode, 
processor);
+
+                               // Take care of scheduling
+                               TimeUnit unit;
+                               if 
(Property::StringToTime(procCfg.schedulingPeriod,
+                                               schedulingPeriod, unit)
+                                               && 
Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
+                                                               
schedulingPeriod)) {
+                                       _logger->log_debug(
+                                                       "convert: 
parseProcessorNode: schedulingPeriod => [%d] ns",
+                                                       schedulingPeriod);
+                                       
processor->setSchedulingPeriodNano(schedulingPeriod);
+                               }
 
-    if (processorsNode) {
-
-        if (processorsNode.IsSequence()) {
-            // Evaluate sequence of processors
-            int numProcessors = processorsNode.size();
-
-
-            for (YAML::const_iterator iter = processorsNode.begin(); iter != 
processorsNode.end(); ++iter) {
-                ProcessorConfig procCfg;
-                YAML::Node procNode = iter->as<YAML::Node>();
-
-                procCfg.name = procNode["name"].as<std::string>();
-                _logger->log_debug("parseProcessorNode: name => [%s]", 
procCfg.name.c_str());
-                procCfg.javaClass = procNode["class"].as<std::string>();
-                _logger->log_debug("parseProcessorNode: class => [%s]", 
procCfg.javaClass.c_str());
-
-                char uuidStr[37];
-                uuid_unparse(_uuid, uuidStr);
-
-                // generate the random UUID
-                uuid_generate(uuid);
-
-                // Determine the processor name only from the Java class
-                int lastOfIdx = procCfg.javaClass.find_last_of(".");
-                if (lastOfIdx != std::string::npos) {
-                    lastOfIdx++; // if a value is found, increment to move 
beyond the .
-                    int nameLength = procCfg.javaClass.length() - lastOfIdx;
-                    std::string processorName = 
procCfg.javaClass.substr(lastOfIdx, nameLength);
-                    processor = this->createProcessor(processorName, uuid);
-                }
-
-                if (!processor) {
-                    _logger->log_error("Could not create a processor %s with 
name %s", procCfg.name.c_str(), uuidStr);
-                    throw std::invalid_argument("Could not create processor " 
+ procCfg.name);
-                }
-                processor->setName(procCfg.name);
-
-                procCfg.maxConcurrentTasks = procNode["max concurrent 
tasks"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: max 
concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str());
-                procCfg.schedulingStrategy = procNode["scheduling 
strategy"].as<std::string>();
-                _logger->log_debug("parseProcessorNode: scheduling strategy => 
[%s]",
-                                   procCfg.schedulingStrategy.c_str());
-                procCfg.schedulingPeriod = procNode["scheduling 
period"].as<std::string>();
-                _logger->log_debug("parseProcessorNode: scheduling period => 
[%s]", procCfg.schedulingPeriod.c_str());
-                procCfg.penalizationPeriod = procNode["penalization 
period"].as<std::string>();
-                _logger->log_debug("parseProcessorNode: penalization period => 
[%s]",
-                                   procCfg.penalizationPeriod.c_str());
-                procCfg.yieldPeriod = procNode["yield 
period"].as<std::string>();
-                _logger->log_debug("parseProcessorNode: yield period => [%s]", 
procCfg.yieldPeriod.c_str());
-                procCfg.yieldPeriod = procNode["run duration 
nanos"].as<std::string>();
-                _logger->log_debug("parseProcessorNode: run duration nanos => 
[%s]", procCfg.runDurationNanos.c_str());
-
-                // handle auto-terminated relationships
-                YAML::Node autoTerminatedSequence = procNode["auto-terminated 
relationships list"];
-                std::vector<std::string> rawAutoTerminatedRelationshipValues;
-                if (autoTerminatedSequence.IsSequence() && 
!autoTerminatedSequence.IsNull()
-                    && autoTerminatedSequence.size() > 0) {
-                    for (YAML::const_iterator relIter = 
autoTerminatedSequence.begin();
-                         relIter != autoTerminatedSequence.end(); ++relIter) {
-                        std::string autoTerminatedRel = 
relIter->as<std::string>();
-                        
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
-                    }
-                }
-                procCfg.autoTerminatedRelationships = 
rawAutoTerminatedRelationshipValues;
-
-                // handle processor properties
-                YAML::Node propertiesNode = procNode["Properties"];
-                parsePropertiesNodeYaml(&propertiesNode, processor);
-
-                // Take care of scheduling
-                TimeUnit unit;
-                if (Property::StringToTime(procCfg.schedulingPeriod, 
schedulingPeriod, unit)
-                    && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, 
schedulingPeriod)) {
-                    _logger->log_debug("convert: parseProcessorNode: 
schedulingPeriod => [%d] ns", schedulingPeriod);
-                    processor->setSchedulingPeriodNano(schedulingPeriod);
-                }
-
-                if (Property::StringToTime(procCfg.penalizationPeriod, 
penalizationPeriod, unit)
-                    && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, 
penalizationPeriod)) {
-                    _logger->log_debug("convert: parseProcessorNode: 
penalizationPeriod => [%d] ms",
-                                       penalizationPeriod);
-                    processor->setPenalizationPeriodMsec(penalizationPeriod);
-                }
-
-                if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, 
unit)
-                    && Property::ConvertTimeUnitToMS(yieldPeriod, unit, 
yieldPeriod)) {
-                    _logger->log_debug("convert: parseProcessorNode: 
yieldPeriod => [%d] ms", yieldPeriod);
-                    processor->setYieldPeriodMsec(yieldPeriod);
-                }
-
-                // Default to running
-                processor->setScheduledState(RUNNING);
-
-                if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
-                    processor->setSchedulingStrategy(TIMER_DRIVEN);
-                    _logger->log_debug("setting scheduling strategy as %s", 
procCfg.schedulingStrategy.c_str());
-                } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
-                    processor->setSchedulingStrategy(EVENT_DRIVEN);
-                    _logger->log_debug("setting scheduling strategy as %s", 
procCfg.schedulingStrategy.c_str());
-                } else {
-                    processor->setSchedulingStrategy(CRON_DRIVEN);
-                    _logger->log_debug("setting scheduling strategy as %s", 
procCfg.schedulingStrategy.c_str());
-
-                }
-
-                int64_t maxConcurrentTasks;
-                if (Property::StringToInt(procCfg.maxConcurrentTasks, 
maxConcurrentTasks)) {
-                    _logger->log_debug("parseProcessorNode: maxConcurrentTasks 
=> [%d]", maxConcurrentTasks);
-                    processor->setMaxConcurrentTasks(maxConcurrentTasks);
-                }
-
-                if (Property::StringToInt(procCfg.runDurationNanos, 
runDurationNanos)) {
-                    _logger->log_debug("parseProcessorNode: runDurationNanos 
=> [%d]", runDurationNanos);
-                    processor->setRunDurationNano(runDurationNanos);
-                }
-
-                std::set<Relationship> autoTerminatedRelationships;
-                for (auto &&relString : procCfg.autoTerminatedRelationships) {
-                    Relationship relationship(relString, "");
-                    _logger->log_debug("parseProcessorNode: 
autoTerminatedRelationship  => [%s]", relString.c_str());
-                    autoTerminatedRelationships.insert(relationship);
-                }
-
-                
processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-                parentGroup->addProcessor(processor);
-            }
-        }
-    } else {
-        throw new std::invalid_argument(
-                "Cannot instantiate a MiNiFi instance without a defined 
Processors configuration node.");
-    }
-}
+                               if 
(Property::StringToTime(procCfg.penalizationPeriod,
+                                               penalizationPeriod, unit)
+                                               && 
Property::ConvertTimeUnitToMS(penalizationPeriod,
+                                                               unit, 
penalizationPeriod)) {
+                                       _logger->log_debug(
+                                                       "convert: 
parseProcessorNode: penalizationPeriod => [%d] ms",
+                                                       penalizationPeriod);
+                                       
processor->setPenalizationPeriodMsec(penalizationPeriod);
+                               }
 
-void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, 
ProcessGroup *parentGroup) {
-    uuid_t uuid;
+                               if (Property::StringToTime(procCfg.yieldPeriod, 
yieldPeriod,
+                                               unit)
+                                               && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit,
+                                                               yieldPeriod)) {
+                                       _logger->log_debug(
+                                                       "convert: 
parseProcessorNode: yieldPeriod => [%d] ms",
+                                                       yieldPeriod);
+                                       
processor->setYieldPeriodMsec(yieldPeriod);
+                               }
 
-    if (!parentGroup) {
-        _logger->log_error("parseRemoteProcessGroupYaml: no parent group 
exists");
-        return;
-    }
+                               // Default to running
+                               processor->setScheduledState(RUNNING);
+
+                               if (procCfg.schedulingStrategy == 
"TIMER_DRIVEN") {
+                                       
processor->setSchedulingStrategy(TIMER_DRIVEN);
+                                       _logger->log_debug("setting scheduling 
strategy as %s",
+                                                       
procCfg.schedulingStrategy.c_str());
+                               } else if (procCfg.schedulingStrategy == 
"EVENT_DRIVEN") {
+                                       
processor->setSchedulingStrategy(EVENT_DRIVEN);
+                                       _logger->log_debug("setting scheduling 
strategy as %s",
+                                                       
procCfg.schedulingStrategy.c_str());
+                               } else {
+                                       
processor->setSchedulingStrategy(CRON_DRIVEN);
+                                       _logger->log_debug("setting scheduling 
strategy as %s",
+                                                       
procCfg.schedulingStrategy.c_str());
 
-    if (rpgNode) {
-        if (rpgNode->IsSequence()) {
-            for (YAML::const_iterator iter = rpgNode->begin(); iter != 
rpgNode->end(); ++iter) {
-                YAML::Node rpgNode = iter->as<YAML::Node>();
+                               }
 
-                auto name = rpgNode["name"].as<std::string>();
-                _logger->log_debug("parseRemoteProcessGroupYaml: name => 
[%s]", name.c_str());
+                               int64_t maxConcurrentTasks;
+                               if 
(Property::StringToInt(procCfg.maxConcurrentTasks,
+                                               maxConcurrentTasks)) {
+                                       _logger->log_debug(
+                                                       "parseProcessorNode: 
maxConcurrentTasks => [%d]",
+                                                       maxConcurrentTasks);
+                                       
processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                               }
 
-                std::string url = rpgNode["url"].as<std::string>();
-                _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", 
url.c_str());
+                               if 
(Property::StringToInt(procCfg.runDurationNanos,
+                                               runDurationNanos)) {
+                                       _logger->log_debug(
+                                                       "parseProcessorNode: 
runDurationNanos => [%d]",
+                                                       runDurationNanos);
+                                       
processor->setRunDurationNano(runDurationNanos);
+                               }
 
-                std::string timeout = rpgNode["timeout"].as<std::string>();
-                _logger->log_debug("parseRemoteProcessGroupYaml: timeout => 
[%s]", timeout.c_str());
+                               std::set<Relationship> 
autoTerminatedRelationships;
+                               for (auto &&relString : 
procCfg.autoTerminatedRelationships) {
+                                       Relationship relationship(relString, 
"");
+                                       _logger->log_debug(
+                                                       "parseProcessorNode: 
autoTerminatedRelationship  => [%s]",
+                                                       relString.c_str());
+                                       
autoTerminatedRelationships.insert(relationship);
+                               }
 
-                std::string yieldPeriod = rpgNode["yield 
period"].as<std::string>();
-                _logger->log_debug("parseRemoteProcessGroupYaml: yield period 
=> [%s]", yieldPeriod.c_str());
+                               processor->setAutoTerminatedRelationships(
+                                               autoTerminatedRelationships);
 
-                YAML::Node inputPorts = rpgNode["Input 
Ports"].as<YAML::Node>();
-                YAML::Node outputPorts = rpgNode["Output 
Ports"].as<YAML::Node>();
-                ProcessGroup *group = NULL;
+                               parentGroup->addProcessor(processor);
+                       }
+               }
+       } else {
+               throw new std::invalid_argument(
+                               "Cannot instantiate a MiNiFi instance without a 
defined Processors configuration node.");
+       }
+}
 
-                // generate the random UUID
-                uuid_generate(uuid);
+void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
+               ProcessGroup *parentGroup) {
+       uuid_t uuid;
 
-                char uuidStr[37];
-                uuid_unparse(_uuid, uuidStr);
+       if (!parentGroup) {
+               _logger->log_error(
+                               "parseRemoteProcessGroupYaml: no parent group 
exists");
+               return;
+       }
 
-                int64_t timeoutValue = -1;
-                int64_t yieldPeriodValue = -1;
+       if (rpgNode) {
+               if (rpgNode->IsSequence()) {
+                       for (YAML::const_iterator iter = rpgNode->begin();
+                                       iter != rpgNode->end(); ++iter) {
+                               YAML::Node rpgNode = iter->as<YAML::Node>();
+
+                               auto name = rpgNode["name"].as<std::string>();
+                               
_logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]",
+                                               name.c_str());
+
+                               std::string url = 
rpgNode["url"].as<std::string>();
+                               
_logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
+                                               url.c_str());
+
+                               std::string timeout = 
rpgNode["timeout"].as<std::string>();
+                               _logger->log_debug(
+                                               "parseRemoteProcessGroupYaml: 
timeout => [%s]",
+                                               timeout.c_str());
+
+                               std::string yieldPeriod =
+                                               rpgNode["yield 
period"].as<std::string>();
+                               _logger->log_debug(
+                                               "parseRemoteProcessGroupYaml: 
yield period => [%s]",
+                                               yieldPeriod.c_str());
+
+                               YAML::Node inputPorts = rpgNode["Input 
Ports"].as<YAML::Node>();
+                               YAML::Node outputPorts =
+                                               rpgNode["Output 
Ports"].as<YAML::Node>();
+                               ProcessGroup *group = NULL;
+
+                               // generate the random UUID
+                               uuid_generate(uuid);
+
+                               char uuidStr[37];
+                               uuid_unparse(_uuid, uuidStr);
+
+                               int64_t timeoutValue = -1;
+                               int64_t yieldPeriodValue = -1;
+
+                               group = 
this->createRemoteProcessGroup(name.c_str(), uuid);
+                               group->setParent(parentGroup);
+                               parentGroup->addProcessGroup(group);
+
+                               TimeUnit unit;
+
+                               if (Property::StringToTime(yieldPeriod, 
yieldPeriodValue, unit)
+                                               && 
Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
+                                                               
yieldPeriodValue) && group) {
+                                       _logger->log_debug(
+                                                       
"parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
+                                                       yieldPeriodValue);
+                                       
group->setYieldPeriodMsec(yieldPeriodValue);
+                               }
 
-                group = this->createRemoteProcessGroup(name.c_str(), uuid);
-                group->setParent(parentGroup);
-                parentGroup->addProcessGroup(group);
+                               if (Property::StringToTime(timeout, 
timeoutValue, unit)
+                                               && 
Property::ConvertTimeUnitToMS(timeoutValue, unit,
+                                                               timeoutValue) 
&& group) {
+                                       _logger->log_debug(
+                                                       
"parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
+                                                       timeoutValue);
+                                       group->setTimeOut(timeoutValue);
+                               }
 
-                TimeUnit unit;
+                               group->setTransmitting(true);
+                               group->setURL(url);
 
-                if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
-                    && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, 
yieldPeriodValue) && group) {
-                    _logger->log_debug("parseRemoteProcessGroupYaml: 
yieldPeriod => [%d] ms", yieldPeriodValue);
-                    group->setYieldPeriodMsec(yieldPeriodValue);
-                }
+                               if (inputPorts && inputPorts.IsSequence()) {
+                                       for (YAML::const_iterator portIter = 
inputPorts.begin();
+                                                       portIter != 
inputPorts.end(); ++portIter) {
+                                               _logger->log_debug("Got a 
current port, iterating...");
 
-                if (Property::StringToTime(timeout, timeoutValue, unit)
-                    && Property::ConvertTimeUnitToMS(timeoutValue, unit, 
timeoutValue) && group) {
-                    _logger->log_debug("parseRemoteProcessGroupYaml: 
timeoutValue => [%d] ms", timeoutValue);
-                    group->setTimeOut(timeoutValue);
-                }
+                                               YAML::Node currPort = 
portIter->as<YAML::Node>();
 
-                group->setTransmitting(true);
-                group->setURL(url);
+                                               this->parsePortYaml(&currPort, 
group, SEND);
+                                       } // for node
+                               }
+                               if (outputPorts && outputPorts.IsSequence()) {
+                                       for (YAML::const_iterator portIter = 
outputPorts.begin();
+                                                       portIter != 
outputPorts.end(); ++portIter) {
+                                               _logger->log_debug("Got a 
current port, iterating...");
 
-                if (inputPorts && inputPorts.IsSequence()) {
-                    for (YAML::const_iterator portIter = inputPorts.begin(); 
portIter != inputPorts.end(); ++portIter) {
-                        _logger->log_debug("Got a current port, iterating...");
+                                               YAML::Node currPort = 
portIter->as<YAML::Node>();
 
-                        YAML::Node currPort = portIter->as<YAML::Node>();
+                                               this->parsePortYaml(&currPort, 
group, RECEIVE);
+                                       } // for node
+                               }
 
-                        this->parsePortYaml(&currPort, group, SEND);
-                    } // for node
-                }
-                if (outputPorts && outputPorts.IsSequence()) {
-                    for (YAML::const_iterator portIter = outputPorts.begin(); 
portIter != outputPorts.end(); ++portIter) {
-                        _logger->log_debug("Got a current port, iterating...");
+                       }
+               }
+       }
+}
 
-                        YAML::Node currPort = portIter->as<YAML::Node>();
+void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
+               ProcessGroup *parent) {
+       uuid_t uuid;
+       Connection *connection = NULL;
 
-                        this->parsePortYaml(&currPort, group, RECEIVE);
-                    } // for node
-                }
+       if (!parent) {
+               _logger->log_error("parseProcessNode: no parent group was 
provided");
+               return;
+       }
 
-            }
-        }
-    }
-}
+       if (connectionsNode) {
+
+               if (connectionsNode->IsSequence()) {
+                       for (YAML::const_iterator iter = 
connectionsNode->begin();
+                                       iter != connectionsNode->end(); ++iter) 
{
+                               // generate the random UUID
+                               uuid_generate(uuid);
+
+                               YAML::Node connectionNode = 
iter->as<YAML::Node>();
+
+                               std::string name = 
connectionNode["name"].as<std::string>();
+                               std::string destName = 
connectionNode["destination name"].as<
+                                               std::string>();
+
+                               char uuidStr[37];
+                               uuid_unparse(_uuid, uuidStr);
+
+                               _logger->log_debug(
+                                               "Created connection with UUID 
%s and name %s", uuidStr,
+                                               name.c_str());
+                               connection = this->createConnection(name, uuid);
+                               auto rawRelationship =
+                                               connectionNode["source 
relationship name"].as<
+                                                               std::string>();
+                               Relationship relationship(rawRelationship, "");
+                               _logger->log_debug("parseConnection: 
relationship => [%s]",
+                                               rawRelationship.c_str());
+                               if (connection)
+                                       
connection->setRelationship(relationship);
+                               std::string connectionSrcProcName =
+                                               connectionNode["source 
name"].as<std::string>();
+
+                               Processor *srcProcessor = 
this->_root->findProcessor(
+                                               connectionSrcProcName);
+
+                               if (!srcProcessor) {
+                                       _logger->log_error(
+                                                       "Could not locate a 
source with name %s to create a connection",
+                                                       
connectionSrcProcName.c_str());
+                                       throw std::invalid_argument(
+                                                       "Could not locate a 
source with name %s to create a connection "
+                                                                       + 
connectionSrcProcName);
+                               }
 
-void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, 
ProcessGroup *parent) {
-    uuid_t uuid;
-    Connection *connection = NULL;
+                               Processor *destProcessor = 
this->_root->findProcessor(destName);
+                               // If we could not find name, try by UUID
+                               if (!destProcessor) {
+                                       uuid_t destUuid;
+                                       uuid_parse(destName.c_str(), destUuid);
+                                       destProcessor = 
this->_root->findProcessor(destUuid);
+                               }
+                               if (destProcessor) {
+                                       std::string destUuid = 
destProcessor->getUUIDStr();
+                               }
 
-    if (!parent) {
-        _logger->log_error("parseProcessNode: no parent group was provided");
-        return;
-    }
+                               uuid_t srcUuid;
+                               uuid_t destUuid;
+                               srcProcessor->getUUID(srcUuid);
+                               connection->setSourceProcessorUUID(srcUuid);
+                               destProcessor->getUUID(destUuid);
+                               
connection->setDestinationProcessorUUID(destUuid);
 
-    if (connectionsNode) {
-
-        if (connectionsNode->IsSequence()) {
-            for (YAML::const_iterator iter = connectionsNode->begin(); iter != 
connectionsNode->end(); ++iter) {
-                // generate the random UUID
-                uuid_generate(uuid);
-
-                YAML::Node connectionNode = iter->as<YAML::Node>();
-
-                std::string name = connectionNode["name"].as<std::string>();
-                std::string destName = connectionNode["destination 
name"].as<std::string>();
-
-                char uuidStr[37];
-                uuid_unparse(_uuid, uuidStr);
-
-                _logger->log_debug("Created connection with UUID %s and name 
%s", uuidStr, name.c_str());
-                connection = this->createConnection(name, uuid);
-                auto rawRelationship = connectionNode["source relationship 
name"].as<std::string>();
-                Relationship relationship(rawRelationship, "");
-                _logger->log_debug("parseConnection: relationship => [%s]", 
rawRelationship.c_str());
-                if (connection)
-                    connection->setRelationship(relationship);
-                std::string connectionSrcProcName = connectionNode["source 
name"].as<std::string>();
-
-                Processor *srcProcessor = 
this->_root->findProcessor(connectionSrcProcName);
-
-                if (!srcProcessor) {
-                    _logger->log_error("Could not locate a source with name %s 
to create a connection",
-                                       connectionSrcProcName.c_str());
-                    throw std::invalid_argument(
-                            "Could not locate a source with name %s to create 
a connection " + connectionSrcProcName);
-                }
-
-                Processor *destProcessor = 
this->_root->findProcessor(destName);
-                // If we could not find name, try by UUID
-                if (!destProcessor) {
-                    uuid_t destUuid;
-                    uuid_parse(destName.c_str(), destUuid);
-                    destProcessor = this->_root->findProcessor(destUuid);
-                }
-                if (destProcessor) {
-                    std::string destUuid = destProcessor->getUUIDStr();
-                }
-
-                uuid_t srcUuid;
-                uuid_t destUuid;
-                srcProcessor->getUUID(srcUuid);
-                connection->setSourceProcessorUUID(srcUuid);
-                destProcessor->getUUID(destUuid);
-                connection->setDestinationProcessorUUID(destUuid);
-
-                if (connection) {
-                    parent->addConnection(connection);
-                }
-            }
-        }
-
-        if (connection)
-            parent->addConnection(connection);
-
-        return;
-    }
-}
+                               if (connection) {
+                                       parent->addConnection(connection);
+                               }
+                       }
+               }
+
+               if (connection)
+                       parent->addConnection(connection);
 
+               return;
+       }
+}
 
-void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, 
TransferDirection direction) {
-    uuid_t uuid;
-    Processor *processor = NULL;
-    RemoteProcessorGroupPort *port = NULL;
+void FlowControllerImpl::parsePortYaml(YAML::Node *portNode,
+               ProcessGroup *parent, TransferDirection direction) {
+       uuid_t uuid;
+       Processor *processor = NULL;
+       RemoteProcessorGroupPort *port = NULL;
 
-    if (!parent) {
-        _logger->log_error("parseProcessNode: no parent group existed");
-        return;
-    }
+       if (!parent) {
+               _logger->log_error("parseProcessNode: no parent group existed");
+               return;
+       }
 
-    YAML::Node inputPortsObj = portNode->as<YAML::Node>();
+       YAML::Node inputPortsObj = portNode->as<YAML::Node>();
 
-    // generate the random UIID
-    uuid_generate(uuid);
+       // generate the random UIID
+       uuid_generate(uuid);
 
-    auto portId = inputPortsObj["id"].as<std::string>();
-    auto nameStr = inputPortsObj["name"].as<std::string>();
-    uuid_parse(portId.c_str(), uuid);
+       auto portId = inputPortsObj["id"].as<std::string>();
+       auto nameStr = inputPortsObj["name"].as<std::string>();
+       uuid_parse(portId.c_str(), uuid);
 
-    port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
+       port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
 
-    processor = (Processor *) port;
-    port->setDirection(direction);
-    port->setTimeOut(parent->getTimeOut());
-    port->setTransmitting(true);
-    processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-    processor->initialize();
+       processor = (Processor *) port;
+       port->setDirection(direction);
+       port->setTimeOut(parent->getTimeOut());
+       port->setTransmitting(true);
+       processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
+       processor->initialize();
 
-    // handle port properties
-    YAML::Node nodeVal = portNode->as<YAML::Node>();
-    YAML::Node propertiesNode = nodeVal["Properties"];
+       // handle port properties
+       YAML::Node nodeVal = portNode->as<YAML::Node>();
+       YAML::Node propertiesNode = nodeVal["Properties"];
 
-    parsePropertiesNodeYaml(&propertiesNode, processor);
+       parsePropertiesNodeYaml(&propertiesNode, processor);
 
-    // add processor to parent
-    parent->addProcessor(processor);
-    processor->setScheduledState(RUNNING);
-    auto rawMaxConcurrentTasks = inputPortsObj["max concurrent 
tasks"].as<std::string>();
-    int64_t maxConcurrentTasks;
-    if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
-        processor->setMaxConcurrentTasks(maxConcurrentTasks);
-    }
-    _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
-    processor->setMaxConcurrentTasks(maxConcurrentTasks);
+       // add processor to parent
+       parent->addProcessor(processor);
+       processor->setScheduledState(RUNNING);
+       auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<
+                       std::string>();
+       int64_t maxConcurrentTasks;
+       if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+               processor->setMaxConcurrentTasks(maxConcurrentTasks);
+       }
+       _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
+                       maxConcurrentTasks);
+       processor->setMaxConcurrentTasks(maxConcurrentTasks);
 
 }
 
-
-void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, 
Processor *processor)
-{
-    // Treat generically as a YAML node so we can perform inspection on 
entries to ensure they are populated
-    for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter 
!= propertiesNode->end(); ++propsIter)
-    {
-        std::string propertyName = propsIter->first.as<std::string>();
-        YAML::Node propertyValueNode = propsIter->second;
-        if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined())
-        {
-            std::string rawValueString = propertyValueNode.as<std::string>();
-            if (!processor->setProperty(propertyName, rawValueString))
-            {
-                _logger->log_warn("Received property %s with value %s but is 
not one of the properties for %s", propertyName.c_str(), 
rawValueString.c_str(), processor->getName().c_str());
-            }
-        }
-    }
+void FlowControllerImpl::parsePropertiesNodeYaml(YAML::Node *propertiesNode,
+               Processor *processor) {
+       // Treat generically as a YAML node so we can perform inspection on 
entries to ensure they are populated
+       for (YAML::const_iterator propsIter = propertiesNode->begin();
+                       propsIter != propertiesNode->end(); ++propsIter) {
+               std::string propertyName = propsIter->first.as<std::string>();
+               YAML::Node propertyValueNode = propsIter->second;
+               if (!propertyValueNode.IsNull() && 
propertyValueNode.IsDefined()) {
+                       std::string rawValueString = 
propertyValueNode.as<std::string>();
+                       if (!processor->setProperty(propertyName, 
rawValueString)) {
+                               _logger->log_warn(
+                                               "Received property %s with 
value %s but is not one of the properties for %s",
+                                               propertyName.c_str(), 
rawValueString.c_str(),
+                                               processor->getName().c_str());
+                       }
+               }
+       }
 }
 
-void FlowController::load() {
+void FlowControllerImpl::load() {
     if (_running) {
         stop(true);
     }
@@ -732,7 +809,7 @@ void FlowController::load() {
     }
 }
 
-void FlowController::reload(std::string yamlFile)
+void FlowControllerImpl::reload(std::string yamlFile)
 {
     _logger->log_info("Starting to reload Flow Controller with yaml %s", 
yamlFile.c_str());
     stop(true);
@@ -752,22 +829,24 @@ void FlowController::reload(std::string yamlFile)
     }
 }
 
-bool FlowController::start() {
-    if (!_initialized) {
-        _logger->log_error("Can not start Flow Controller because it has not 
been initialized");
-        return false;
-    } else {
-        if (!_running) {
-            _logger->log_info("Start Flow Controller");
-            this->_timerScheduler.start();
-            this->_eventScheduler.start();
-            if (this->_root)
-                this->_root->startProcessing(
-                        &this->_timerScheduler,
-                        &this->_eventScheduler);
-            _running = true;
-            this->_protocol->start();
-        }
-        return true;
-    }
+bool FlowControllerImpl::start() {
+       if (!_initialized) {
+               _logger->log_error(
+                               "Can not start Flow Controller because it has 
not been initialized");
+               return false;
+       } else {
+
+               if (!_running) {
+                       _logger->log_info("Starting Flow Controller");
+                       this->_timerScheduler.start();
+                       this->_eventScheduler.start();
+                       if (this->_root)
+                               
this->_root->startProcessing(&this->_timerScheduler,
+                                               &this->_eventScheduler);
+                       _running = true;
+                       this->_protocol->start();
+                       _logger->log_info("Started Flow Controller");
+               }
+               return true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 2dda47a..9d01016 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -83,7 +83,7 @@ FlowFileRecord::~FlowFileRecord()
        {
                // Decrease the flow file record owned count for the resource 
claim
                _claim->decreaseFlowFileRecordOwnedCount();
-               if (_claim->getFlowFileRecordOwnedCount() == 0)
+               if (_claim->getFlowFileRecordOwnedCount() <= 0)
                {
                        _logger->log_debug("Delete Resource Claim %s", 
_claim->getContentFullPath().c_str());
                        std::remove(_claim->getContentFullPath().c_str());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp
index 70969c9..53d285d 100644
--- a/libminifi/src/GetFile.cpp
+++ b/libminifi/src/GetFile.cpp
@@ -32,8 +32,12 @@
 #include <dirent.h>
 #include <limits.h>
 #include <unistd.h>
+#if  (__GNUC__ >= 4) 
+       #if (__GNUC_MINOR__ < 9)
+               #include <regex.h>
+       #endif
+#endif
 #include <regex>
-
 #include "TimeUtil.h"
 #include "GetFile.h"
 #include "ProcessContext.h"
@@ -81,6 +85,8 @@ void GetFile::initialize()
 void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
 {
        std::string value;
+
+       _logger->log_info("onTrigger GetFile");
        if (context->getProperty(Directory.getName(), value))
        {
                _directory = value;
@@ -97,6 +103,8 @@ void GetFile::onTrigger(ProcessContext *context, 
ProcessSession *session)
        {
                Property::StringToBool(value, _keepSourceFile);
        }
+
+       _logger->log_info("onTrigger GetFile");
        if (context->getProperty(MaxAge.getName(), value))
        {
                TimeUnit unit;
@@ -143,6 +151,7 @@ void GetFile::onTrigger(ProcessContext *context, 
ProcessSession *session)
        }
 
        // Perform directory list
+       _logger->log_info("Is listing empty %i",isListingEmpty());
        if (isListingEmpty())
        {
                if (_pollInterval == 0 || (getTimeMillis() - 
_lastDirectoryListingTime) > _pollInterval)
@@ -150,6 +159,7 @@ void GetFile::onTrigger(ProcessContext *context, 
ProcessSession *session)
                        performListing(_directory);
                }
        }
+       _logger->log_info("Is listing empty %i",isListingEmpty());
 
        if (!isListingEmpty())
        {
@@ -159,6 +169,7 @@ void GetFile::onTrigger(ProcessContext *context, 
ProcessSession *session)
                        pollListing(list, _batchSize);
                        while (!list.empty())
                        {
+
                                std::string fileName = list.front();
                                list.pop();
                                _logger->log_info("GetFile process %s", 
fileName.c_str());
@@ -185,6 +196,7 @@ void GetFile::onTrigger(ProcessContext *context, 
ProcessSession *session)
                        throw;
                }
        }
+
 }
 
 bool GetFile::isListingEmpty()
@@ -243,16 +255,34 @@ bool GetFile::acceptFile(std::string fullName, 
std::string name)
                if (_keepSourceFile == false && access(fullName.c_str(), W_OK) 
!= 0)
                        return false;
 
-               try {
-                       std::regex re(_fileFilter);
-                       if (!std::regex_match(name, re)) {
-                               return false;
-                       }
-               } catch (std::regex_error e) {
-                       _logger->log_error("Invalid File Filter regex: %s.", 
e.what());
-                       return false;
-               }
 
+       #ifdef __GNUC__ 
+               #if (__GNUC__ >= 4)
+                       #if (__GNUC_MINOR__ < 9)
+                               regex_t regex;
+                               int ret = regcomp(&regex, 
_fileFilter.c_str(),0);
+                               if (ret)
+                                       return false;
+                               ret = 
regexec(&regex,name.c_str(),(size_t)0,NULL,0);
+                               regfree(&regex);
+                               if (ret)
+                                       return false;   
+                       #else
+                               try{
+                                       std::regex re(_fileFilter);
+       
+                                       if (!std::regex_match(name, re)) {
+                                               return false;
+                                       }
+                               } catch (std::regex_error e) {
+                                       _logger->log_error("Invalid File Filter 
regex: %s.", e.what());
+                                       return false;
+                               }
+                       #endif
+               #endif
+               #else
+                       _logger->log_info("Cannot support regex filtering");
+               #endif
                return true;
        }
 
@@ -261,11 +291,14 @@ bool GetFile::acceptFile(std::string fullName, 
std::string name)
 
 void GetFile::performListing(std::string dir)
 {
+       _logger->log_info("Performing file listing against %s",dir.c_str());
        DIR *d;
        d = opendir(dir.c_str());
        if (!d)
                return;
-       while (1)
+       // only perform a listing while we are not empty
+       _logger->log_info("Performing file listing against %s",dir.c_str());
+       while (isRunning())
        {
                struct dirent *entry;
                entry = readdir(d);

Reply via email to