Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 63dbb8241 -> 09d973baf


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
index 7c98278..b3cb8ac 100644
--- a/libminifi/src/ProcessGroup.cpp
+++ b/libminifi/src/ProcessGroup.cpp
@@ -135,9 +135,10 @@ void 
ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
        try
        {
                // Start all the processor node, input and output ports
-               for (std::set<Processor *>::iterator it = _processors.begin(); 
it != _processors.end(); ++it)
+               for(auto processor : _processors)
                {
-                       Processor *processor(*it);
+                       _logger->log_debug("Starting 
%s",processor->getName().c_str());
+
                        if (!processor->isRunning() && 
processor->getScheduledState() != DISABLED)
                        {
                                if (processor->getSchedulingStrategy() == 
TIMER_DRIVEN)
@@ -146,10 +147,9 @@ void 
ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
                                        eventScheduler->schedule(processor);
                        }
                }
-
-               for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+               // Start processing the group
+               for(auto processGroup : _childProcessGroups)
                {
-                       ProcessGroup *processGroup(*it);
                        processGroup->startProcessing(timeScheduler, 
eventScheduler);
                }
        }
@@ -202,12 +202,14 @@ void 
ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
 
 Processor *ProcessGroup::findProcessor(uuid_t uuid)
 {
+
        Processor *ret = NULL;
        // std::lock_guard<std::mutex> lock(_mtx);
 
        for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
        {
                Processor *processor(*it);
+               _logger->log_info("find processor 
%s",processor->getName().c_str());
                uuid_t processorUUID;
                if (processor->getUUID(processorUUID) && 
uuid_compare(processorUUID, uuid) == 0)
                        return processor;
@@ -215,7 +217,9 @@ Processor *ProcessGroup::findProcessor(uuid_t uuid)
 
        for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
        {
+
                ProcessGroup *processGroup(*it);
+               _logger->log_info("find processor child 
%s",processGroup->getName().c_str());
                Processor *processor = processGroup->findProcessor(uuid);
                if (processor)
                        return processor;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
index abe75bd..f3d769e 100644
--- a/libminifi/src/ProcessSession.cpp
+++ b/libminifi/src/ProcessSession.cpp
@@ -204,7 +204,7 @@ void ProcessSession::write(FlowFileRecord *flow, 
OutputStreamCallback *callback)
 {
        ResourceClaim *claim = NULL;
 
-       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+       claim = new ResourceClaim();
 
        try
        {
@@ -382,7 +382,7 @@ void ProcessSession::import(std::string source, 
FlowFileRecord *flow, bool keepS
 {
        ResourceClaim *claim = NULL;
 
-       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+       claim = new ResourceClaim();
        char *buf = NULL;
        int size = 4096;
        buf = new char [size];
@@ -420,9 +420,10 @@ void ProcessSession::import(std::string source, 
FlowFileRecord *flow, bool keepS
                                }
                                flow->_claim = claim;
                                claim->increaseFlowFileRecordOwnedCount();
-                               /*
+
                                _logger->log_debug("Import offset %d length %d 
into content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+
                                fs.close();
                                input.close();
                                if (!keepSource)
@@ -478,10 +479,9 @@ void ProcessSession::commit()
        try
        {
                // First we clone the flow record based on the transfered 
relationship for updated flow record
-               std::map<std::string, FlowFileRecord *>::iterator it;
-               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               for (auto && it : _updatedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        if (record->_markedDelete)
                                continue;
                        std::map<std::string, Relationship>::iterator 
itRelationship =
@@ -537,11 +537,10 @@ void ProcessSession::commit()
                                throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
                        }
                }
-
                // Do the samething for added flow file
-               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               for(const auto it : _addedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        if (record->_markedDelete)
                                continue;
                        std::map<std::string, Relationship>::iterator 
itRelationship =
@@ -597,11 +596,10 @@ void ProcessSession::commit()
                                throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
                        }
                }
-
                // Complete process the added and update flow files for the 
session, send the flow file to its queue
-               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               for(const auto &it : _updatedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        if (record->_markedDelete)
                        {
                                continue;
@@ -611,9 +609,9 @@ void ProcessSession::commit()
                        else
                                delete record;
                }
-               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               for(const auto &it : _addedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        if (record->_markedDelete)
                        {
                                continue;
@@ -624,9 +622,9 @@ void ProcessSession::commit()
                                delete record;
                }
                // Process the clone flow files
-               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
+               for(const auto &it : _clonedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        if (record->_markedDelete)
                        {
                                continue;
@@ -637,15 +635,15 @@ void ProcessSession::commit()
                                delete record;
                }
                // Delete the deleted flow files
-               for (it = _deletedFlowFiles.begin(); it!= 
_deletedFlowFiles.end(); it++)
+               for(const auto &it : _deletedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        delete record;
                }
                // Delete the snapshot
-               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
+               for(const auto &it : _originalFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        delete record;
                }
                // All done
@@ -675,11 +673,10 @@ void ProcessSession::rollback()
 {
        try
        {
-               std::map<std::string, FlowFileRecord *>::iterator it;
                // Requeue the snapshot of the flowfile back
-               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
+               for(const auto &it : _originalFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        if (record->_orginalConnection)
                        {
                                record->_snapshot = false;
@@ -690,21 +687,21 @@ void ProcessSession::rollback()
                }
                _originalFlowFiles.clear();
                // Process the clone flow files
-               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
+               for(const auto &it : _clonedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        delete record;
                }
                _clonedFlowFiles.clear();
-               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               for(const auto &it : _addedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        delete record;
                }
                _addedFlowFiles.clear();
-               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               for(const auto &it : _updatedFlowFiles)
                {
-                       FlowFileRecord *record = it->second;
+                       FlowFileRecord *record = it.second;
                        delete record;
                }
                _updatedFlowFiles.clear();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
index 6a11893..1b8e286 100644
--- a/libminifi/src/Processor.cpp
+++ b/libminifi/src/Processor.cpp
@@ -42,7 +42,7 @@ Processor::Processor(std::string name, uuid_t uuid)
        char uuidStr[37];
        uuid_unparse(_uuid, uuidStr);
        _uuidStr = uuidStr;
-
+       _hasWork.store(false);
        // Setup the default values
        _state = DISABLED;
        _strategy = TIMER_DRIVEN;
@@ -57,7 +57,6 @@ Processor::Processor(std::string name, uuid_t uuid)
        _yieldExpiration = 0;
        _incomingConnectionsIter = this->_incomingConnections.begin();
        _logger = Logger::getLogger();
-
        _logger->log_info("Processor %s created UUID %s", _name.c_str(), 
_uuidStr.c_str());
 }
 
@@ -83,9 +82,8 @@ bool Processor::setSupportedProperties(std::set<Property> 
properties)
        std::lock_guard<std::mutex> lock(_mtx);
 
        _properties.clear();
-       for (std::set<Property>::iterator it = properties.begin(); it != 
properties.end(); ++it)
+       for (auto item : properties)
        {
-               Property item(*it);
                _properties[item.getName()] = item;
                _logger->log_info("Processor %s supported property name %s", 
_name.c_str(), item.getName().c_str());
        }
@@ -105,9 +103,8 @@ bool 
Processor::setSupportedRelationships(std::set<Relationship> relationships)
        std::lock_guard<std::mutex> lock(_mtx);
 
        _relationships.clear();
-       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
+       for(auto item : relationships)
        {
-               Relationship item(*it);
                _relationships[item.getName()] = item;
                _logger->log_info("Processor %s supported relationship name 
%s", _name.c_str(), item.getName().c_str());
        }
@@ -127,9 +124,8 @@ bool 
Processor::setAutoTerminatedRelationships(std::set<Relationship> relationsh
        std::lock_guard<std::mutex> lock(_mtx);
 
        _autoTerminatedRelationships.clear();
-       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
+       for(auto item : relationships)
        {
-               Relationship item(*it);
                _autoTerminatedRelationships[item.getName()] = item;
                _logger->log_info("Processor %s auto terminated relationship 
name %s", _name.c_str(), item.getName().c_str());
        }
@@ -140,21 +136,18 @@ bool 
Processor::setAutoTerminatedRelationships(std::set<Relationship> relationsh
 bool Processor::isAutoTerminated(Relationship relationship)
 {
        bool isRun = isRunning();
+               
+       auto conditionalLock = !isRun ? 
+                         std::unique_lock<std::mutex>() 
+                       : std::unique_lock<std::mutex>(_mtx);
 
-       if (!isRun)
-               _mtx.lock();
-
-       std::map<std::string, Relationship>::iterator it = 
_autoTerminatedRelationships.find(relationship.getName());
+       const auto &it = 
_autoTerminatedRelationships.find(relationship.getName());
        if (it != _autoTerminatedRelationships.end())
        {
-               if (!isRun)
-                       _mtx.unlock();
                return true;
        }
        else
        {
-               if (!isRun)
-                       _mtx.unlock();
                return false;
        }
 }
@@ -163,20 +156,17 @@ bool Processor::isSupportedRelationship(Relationship 
relationship)
 {
        bool isRun = isRunning();
 
-       if (!isRun)
-               _mtx.lock();
+       auto conditionalLock = !isRun ? 
+                         std::unique_lock<std::mutex>() 
+                       : std::unique_lock<std::mutex>(_mtx);
 
-       std::map<std::string, Relationship>::iterator it = 
_relationships.find(relationship.getName());
+       const auto &it = _relationships.find(relationship.getName());
        if (it != _relationships.end())
        {
-               if (!isRun)
-                       _mtx.unlock();
                return true;
        }
        else
        {
-               if (!isRun)
-                       _mtx.unlock();
                return false;
        }
 }
@@ -185,23 +175,20 @@ bool Processor::getProperty(std::string name, std::string 
&value)
 {
        bool isRun = isRunning();
 
-       if (!isRun)
-               // Because set property only allowed in non running state, we 
need to obtain lock avoid rack condition
-               _mtx.lock();
-
-       std::map<std::string, Property>::iterator it = _properties.find(name);
+       
+        auto conditionalLock = !isRun ? 
+                           std::unique_lock<std::mutex>() 
+                         : std::unique_lock<std::mutex>(_mtx);
+                        
+       const auto &it = _properties.find(name);
        if (it != _properties.end())
        {
                Property item = it->second;
                value = item.getValue();
-               if (!isRun)
-                       _mtx.unlock();
                return true;
        }
        else
        {
-               if (!isRun)
-                       _mtx.unlock();
                return false;
        }
 }
@@ -210,7 +197,7 @@ bool Processor::setProperty(std::string name, std::string 
value)
 {
 
        std::lock_guard<std::mutex> lock(_mtx);
-       std::map<std::string, Property>::iterator it = _properties.find(name);
+       auto &&it = _properties.find(name);
 
        if (it != _properties.end())
        {
@@ -254,7 +241,7 @@ std::set<Connection *> 
Processor::getOutGoingConnections(std::string relationshi
 {
        std::set<Connection *> empty;
 
-       std::map<std::string, std::set<Connection *>>::iterator it = 
_outGoingConnections.find(relationship);
+       auto  &&it = _outGoingConnections.find(relationship);
        if (it != _outGoingConnections.end())
        {
                return _outGoingConnections[relationship];
@@ -269,6 +256,7 @@ bool Processor::addConnection(Connection *connection)
 {
        bool ret = false;
 
+
        if (isRunning())
        {
                _logger->log_info("Can not add connection while the process %s 
is running",
@@ -276,6 +264,7 @@ bool Processor::addConnection(Connection *connection)
                return false;
        }
 
+
        std::lock_guard<std::mutex> lock(_mtx);
 
        uuid_t srcUUID;
@@ -283,8 +272,14 @@ bool Processor::addConnection(Connection *connection)
 
        connection->getSourceProcessorUUID(srcUUID);
        connection->getDestinationProcessorUUID(destUUID);
+       char uuid_str[37];
 
-       if (uuid_compare(_uuid, destUUID) == 0)
+
+       uuid_unparse_lower(_uuid, uuid_str);
+       std::string my_uuid = uuid_str;
+       uuid_unparse_lower(destUUID, uuid_str);
+       std::string destination_uuid = uuid_str;
+       if (my_uuid == destination_uuid)
        {
                // Connection is destination to the current processor
                if (_incomingConnections.find(connection) == 
_incomingConnections.end())
@@ -297,12 +292,13 @@ bool Processor::addConnection(Connection *connection)
                        ret = true;
                }
        }
-
-       if (uuid_compare(_uuid, srcUUID) == 0)
+       uuid_unparse_lower(srcUUID, uuid_str);
+       std::string source_uuid = uuid_str;
+       if (my_uuid == source_uuid)
        {
                std::string relationship = 
connection->getRelationship().getName();
                // Connection is source from the current processor
-               std::map<std::string, std::set<Connection *>>::iterator it =
+               auto &&it =
                                _outGoingConnections.find(relationship);
                if (it != _outGoingConnections.end())
                {
@@ -321,6 +317,7 @@ bool Processor::addConnection(Connection *connection)
                }
                else
                {
+
                        // We do not have any outgoing connection for this 
relationship yet
                        std::set<Connection *> newConnection;
                        newConnection.insert(connection);
@@ -331,6 +328,7 @@ bool Processor::addConnection(Connection *connection)
                        ret = true;
                }
        }
+       
 
        return ret;
 }
@@ -369,7 +367,7 @@ void Processor::removeConnection(Connection *connection)
        {
                std::string relationship = 
connection->getRelationship().getName();
                // Connection is source from the current processor
-               std::map<std::string, std::set<Connection *>>::iterator it =
+               auto &&it =
                                _outGoingConnections.find(relationship);
                if (it == _outGoingConnections.end())
                {
@@ -414,9 +412,8 @@ bool Processor::flowFilesQueued()
        if (_incomingConnections.size() == 0)
                return false;
 
-       for (std::set<Connection *>::iterator it = 
_incomingConnections.begin(); it != _incomingConnections.end(); ++it)
+       for(auto &&connection : _incomingConnections)
        {
-               Connection *connection = *it;
                if (connection->getQueueSize() > 0)
                        return true;
        }
@@ -428,15 +425,12 @@ bool Processor::flowFilesOutGoingFull()
 {
        std::lock_guard<std::mutex> lock(_mtx);
 
-       std::map<std::string, std::set<Connection *>>::iterator it;
-
-       for (it = _outGoingConnections.begin(); it != 
_outGoingConnections.end(); ++it)
+        for(auto &&connection : _outGoingConnections)
        {
                // We already has connection for this relationship
-               std::set<Connection *> existedConnection = it->second;
-               for (std::set<Connection *>::iterator itConnection = 
existedConnection.begin(); itConnection != existedConnection.end(); 
++itConnection)
+               std::set<Connection *> existedConnection = connection.second;
+               for(const auto connection : existedConnection)
                {
-                       Connection *connection = *itConnection;
                        if (connection->isFull())
                                return true;
                }
@@ -476,15 +470,14 @@ void Processor::onTrigger()
 
 void Processor::waitForWork(uint64_t timeoutMs)
 {
-       std::unique_lock<std::mutex> lock(_workAvailableMtx);
-       _hasWork = isWorkAvailable();
+       _hasWork.store( isWorkAvailable() );
 
-       if (!_hasWork)
+       if (!_hasWork.load())
        {
-               _hasWorkCondition.wait_for(lock, 
std::chrono::milliseconds(timeoutMs), [&] { return _hasWork; });
+           std::unique_lock<std::mutex> lock(_workAvailableMtx);
+           _hasWorkCondition.wait_for(lock, 
std::chrono::milliseconds(timeoutMs), [&] { return _hasWork.load(); });
        }
 
-       lock.unlock();
 }
 
 void Processor::notifyWork()
@@ -496,17 +489,12 @@ void Processor::notifyWork()
        }
 
        {
-               std::unique_lock<std::mutex> lock(_workAvailableMtx);
-               _hasWork = isWorkAvailable();
-
-               // Keep a scope-local copy of the state to avoid race conditions
-               bool hasWork = _hasWork;
+               _hasWork.store( isWorkAvailable() );
 
-               lock.unlock();
 
-               if (hasWork)
+               if (_hasWork.load())
                {
-                       _hasWorkCondition.notify_one();
+                     _hasWorkCondition.notify_one();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp
index a2a4310..c21de76 100644
--- a/libminifi/src/Provenance.cpp
+++ b/libminifi/src/Provenance.cpp
@@ -17,648 +17,369 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <cstdint>
+#include <vector>
+#include <arpa/inet.h>
+#include "Serializable.h"
 #include "Provenance.h"
 #include "Relationship.h"
 #include "Logger.h"
 #include "FlowController.h"
 
-int ProvenanceEventRecord::readUTF(std::string &str, bool widen)
-{
-    uint16_t utflen;
-    int ret;
-
-    if (!widen)
-    {
-       ret = read(utflen);
-       if (ret <= 0)
-               return ret;
-    }
-    else
-    {
-       uint32_t len;
-               ret = read(len);
-        if (ret <= 0)
-               return ret;
-        utflen = len;
-    }
-
-    uint8_t *bytearr = NULL;
-    char *chararr = NULL;
-    bytearr = new uint8_t[utflen];
-    chararr = new char[utflen];
-    memset(chararr, 0, utflen);
-
-    int c, char2, char3;
-    int count = 0;
-    int chararr_count=0;
-
-    ret = read(bytearr, utflen);
-    if (ret <= 0)
-    {
-       delete[] bytearr;
-       delete[] chararr;
-       if (ret == 0)
-       {
-        if (!widen)
-               return (2 + utflen);
-           else
-               return (4 + utflen);
-       }
-       else
-               return ret;
-    }
-
-    while (count < utflen) {
-        c = (int) bytearr[count] & 0xff;
-        if (c > 127) break;
-        count++;
-        chararr[chararr_count++]=(char)c;
-    }
-
-    while (count < utflen) {
-        c = (int) bytearr[count] & 0xff;
-        switch (c >> 4) {
-            case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
-                /* 0xxxxxxx*/
-                count++;
-                chararr[chararr_count++]=(char)c;
-                break;
-            case 12: case 13:
-                /* 110x xxxx   10xx xxxx*/
-                count += 2;
-                if (count > utflen)
-                {
-                       delete[] bytearr;
-                       delete[] chararr;
-                       return -1;
-                }
-                char2 = (int) bytearr[count-1];
-                if ((char2 & 0xC0) != 0x80)
-                {
-                       delete[] bytearr;
-                       delete[] chararr;
-                       return -1;
-                }
-                chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
-                                                (char2 & 0x3F));
-                break;
-            case 14:
-                /* 1110 xxxx  10xx xxxx  10xx xxxx */
-                count += 3;
-                if (count > utflen)
-                {
-                       delete[] bytearr;
-                       delete[] chararr;
-                       return -1;
-                }
-                char2 = (int) bytearr[count-2];
-                char3 = (int) bytearr[count-1];
-                if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
-                {
-                       delete[] bytearr;
-                       delete[] chararr;
-                       return -1;
-                }
-                chararr[chararr_count++]=(char)(((c     & 0x0F) << 12) |
-                                                ((char2 & 0x3F) << 6)  |
-                                                ((char3 & 0x3F) << 0));
-                break;
-            default:
-               delete[] bytearr;
-               delete[] chararr;
-               return -1;
-        }
-    }
-    // The number of chars produced may be less than utflen
-    std::string value(chararr, chararr_count);
-    str = value;
-    delete[] bytearr;
-    delete[] chararr;
-    if (!widen)
-       return (2 + utflen);
-    else
-       return (4 + utflen);
-}
-
-int ProvenanceEventRecord::writeUTF(std::string str, bool widen)
-{
-       int strlen = str.length();
-       int utflen = 0;
-       int c, count = 0;
-
-       /* use charAt instead of copying String to char array */
-       for (int i = 0; i < strlen; i++) {
-               c = str.at(i);
-               if ((c >= 0x0001) && (c <= 0x007F)) {
-                       utflen++;
-               } else if (c > 0x07FF) {
-                       utflen += 3;
-               } else {
-                       utflen += 2;
-               }
-       }
-
-       if (utflen > 65535)
-               return -1;
-
-       uint8_t *bytearr = NULL;
-       if (!widen)
-       {
-               bytearr = new uint8_t[utflen+2];
-               bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
-               bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
-       }
-       else
-       {
-               bytearr = new uint8_t[utflen+4];
-               bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF);
-               bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF);
-               bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
-               bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
-       }
-
-       int i=0;
-       for (i=0; i<strlen; i++) {
-               c = str.at(i);
-               if (!((c >= 0x0001) && (c <= 0x007F))) break;
-               bytearr[count++] = (uint8_t) c;
-       }
-
-       for (;i < strlen; i++){
-               c = str.at(i);
-               if ((c >= 0x0001) && (c <= 0x007F)) {
-                       bytearr[count++] = (uint8_t) c;
-               } else if (c > 0x07FF) {
-                       bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 
0x0F));
-                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  6) & 
0x3F));
-                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 
0x3F));
-               } else {
-                       bytearr[count++] = (uint8_t) (0xC0 | ((c >>  6) & 
0x1F));
-                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 
0x3F));
-               }
-       }
-       int ret;
-       if (!widen)
-       {
-               ret = writeData(bytearr, utflen+2);
-       }
-       else
-       {
-               ret = writeData(bytearr, utflen+4);
-       }
-       delete[] bytearr;
-       return ret;
-}
-
 //! DeSerialize
-bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo, 
std::string key)
-{
+bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo,
+               std::string key) {
        std::string value;
        bool ret;
 
        ret = repo->Get(key, value);
 
-       if (!ret)
-       {
-               _logger->log_error("NiFi Provenance Store event %s can not 
found", key.c_str());
+       if (!ret) {
+               _logger->log_error("NiFi Provenance Store event %s can not 
found",
+                               key.c_str());
                return false;
-       }
-       else
-               _logger->log_debug("NiFi Provenance Read event %s length %d", 
key.c_str(), value.length());
+       } else
+               _logger->log_debug("NiFi Provenance Read event %s length %d",
+                               key.c_str(), value.length());
 
-       ret = DeSerialize((unsigned char *) value.data(), value.length());
 
-       if (ret)
-       {
-               _logger->log_debug("NiFi Provenance retrieve event %s size %d 
eventType %d success", _eventIdStr.c_str(), _serializeBufSize, _eventType);
-       }
-       else
-       {
-               _logger->log_debug("NiFi Provenance retrieve event %s size %d 
eventType %d fail", _eventIdStr.c_str(), _serializeBufSize, _eventType);
+       DataStream stream((const uint8_t*)value.data(),value.length());
+
+       ret = DeSerialize(stream);
+
+       if (ret) {
+               _logger->log_debug(
+                               "NiFi Provenance retrieve event %s size %d 
eventType %d success",
+                               _eventIdStr.c_str(), stream.getSize(), 
_eventType);
+       } else {
+               _logger->log_debug(
+                               "NiFi Provenance retrieve event %s size %d 
eventType %d fail",
+                               _eventIdStr.c_str(), stream.getSize(), 
_eventType);
        }
 
        return ret;
 }
 
-bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo)
-{
-       if (_serializedBuf)
-               // Serialize in progress
-               return false;
-       _serializedBuf = NULL;
-       _serializeBufSize = 0;
-       _maxSerializeBufSize = 0;
-       _serializedBuf = new uint8_t[PROVENANCE_EVENT_RECORD_SEG_SIZE];
-       if (!_serializedBuf)
-               return false;
-       _maxSerializeBufSize = PROVENANCE_EVENT_RECORD_SEG_SIZE;
+bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) {
+
+       DataStream outStream;
 
        int ret;
 
-       ret = writeUTF(this->_eventIdStr);
-       if (ret <= 0)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = writeUTF(this->_eventIdStr,&outStream);
+       if (ret <= 0) {
+
                return false;
        }
 
        uint32_t eventType = this->_eventType;
-       ret = write(eventType);
-       if (ret != 4)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(eventType,&outStream);
+       if (ret != 4) {
+
                return false;
        }
 
-       ret = write(this->_eventTime);
-       if (ret != 8)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(this->_eventTime,&outStream);
+       if (ret != 8) {
+
                return false;
        }
 
-       ret = write(this->_entryDate);
-       if (ret != 8)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(this->_entryDate,&outStream);
+       if (ret != 8) {
                return false;
        }
 
-       ret = write(this->_eventDuration);
-       if (ret != 8)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(this->_eventDuration,&outStream);
+       if (ret != 8) {
+
                return false;
        }
 
-       ret = write(this->_lineageStartDate);
-       if (ret != 8)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(this->_lineageStartDate,&outStream);
+       if (ret != 8) {
+
                return false;
        }
 
-       ret = writeUTF(this->_componentId);
-       if (ret <= 0)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = writeUTF(this->_componentId,&outStream);
+       if (ret <= 0) {
+
                return false;
        }
 
-       ret = writeUTF(this->_componentType);
-       if (ret <= 0)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = writeUTF(this->_componentType,&outStream);
+       if (ret <= 0) {
+
                return false;
        }
 
-       ret = writeUTF(this->_uuid);
-       if (ret <= 0)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = writeUTF(this->_uuid,&outStream);
+       if (ret <= 0) {
+
                return false;
        }
 
-       ret = writeUTF(this->_details);
-       if (ret <= 0)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = writeUTF(this->_details,&outStream);
+       if (ret <= 0) {
+
                return false;
        }
 
        // write flow attributes
        uint32_t numAttributes = this->_attributes.size();
-       ret = write(numAttributes);
-       if (ret != 4)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(numAttributes,&outStream);
+       if (ret != 4) {
+
                return false;
        }
 
-       std::map<std::string, std::string>::iterator itAttribute;
-       for (itAttribute = this->_attributes.begin(); itAttribute!= 
this->_attributes.end(); itAttribute++)
-       {
-               ret = writeUTF(itAttribute->first, true);
-               if (ret <= 0)
-               {
-                       delete[] _serializedBuf;
-                       _serializedBuf = NULL;
+       for (auto itAttribute : _attributes) {
+               ret = writeUTF(itAttribute.first,&outStream, true);
+               if (ret <= 0) {
+
                        return false;
                }
-               ret = writeUTF(itAttribute->second, true);
-               if (ret <= 0)
-               {
-                       delete[] _serializedBuf;
-                       _serializedBuf = NULL;
+               ret = writeUTF(itAttribute.second,&outStream, true);
+               if (ret <= 0) {
+
                        return false;
                }
        }
 
-       ret = writeUTF(this->_contentFullPath);
-       if (ret <= 0)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = writeUTF(this->_contentFullPath,&outStream);
+       if (ret <= 0) {
+
                return false;
        }
 
-       ret = write(this->_size);
-       if (ret != 8)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(this->_size,&outStream);
+       if (ret != 8) {
+
                return false;
        }
 
-       ret = write(this->_offset);
-       if (ret != 8)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = write(this->_offset,&outStream);
+       if (ret != 8) {
+
                return false;
        }
 
-       ret = writeUTF(this->_sourceQueueIdentifier);
-       if (ret <= 0)
-       {
-               delete[] _serializedBuf;
-               _serializedBuf = NULL;
+       ret = writeUTF(this->_sourceQueueIdentifier,&outStream);
+       if (ret <= 0) {
+
                return false;
        }
 
-       if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType 
== ProvenanceEventRecord::CLONE || this->_eventType == 
ProvenanceEventRecord::JOIN)
-       {
+       if (this->_eventType == ProvenanceEventRecord::FORK
+                       || this->_eventType == ProvenanceEventRecord::CLONE
+                       || this->_eventType == ProvenanceEventRecord::JOIN) {
                // write UUIDs
                uint32_t number = this->_parentUuids.size();
-               ret = write(number);
-               if (ret != 4)
-               {
-                       delete[] _serializedBuf;
-                       _serializedBuf = NULL;
+               ret = write(number,&outStream);
+               if (ret != 4) {
+
                        return false;
                }
-               std::vector<std::string>::iterator it;
-               for (it = this->_parentUuids.begin(); it!= 
this->_parentUuids.end(); it++)
-               {
-                       std::string parentUUID = *it;
-                       ret = writeUTF(parentUUID);
-                       if (ret <= 0)
-                       {
-                               delete[] _serializedBuf;
-                               _serializedBuf = NULL;
+               for (auto parentUUID : _parentUuids) {
+                       ret = writeUTF(parentUUID,&outStream);
+                       if (ret <= 0) {
+
                                return false;
                        }
                }
                number = this->_childrenUuids.size();
-               ret = write(number);
-               if (ret != 4)
-               {
-                       delete[] _serializedBuf;
-                       _serializedBuf = NULL;
+               ret = write(number,&outStream);
+               if (ret != 4) {
                        return false;
                }
-               for (it = this->_childrenUuids.begin(); it!= 
this->_childrenUuids.end(); it++)
-               {
-                       std::string childUUID = *it;
-                       ret = writeUTF(childUUID);
-                       if (ret <= 0)
-                       {
-                               delete[] _serializedBuf;
-                               _serializedBuf = NULL;
+               for (auto childUUID : _childrenUuids) {
+                       ret = writeUTF(childUUID,&outStream);
+                       if (ret <= 0) {
+
                                return false;
                        }
                }
-       }
-       else if (this->_eventType == ProvenanceEventRecord::SEND || 
this->_eventType == ProvenanceEventRecord::FETCH)
-       {
-               ret = writeUTF(this->_transitUri);
-               if (ret <= 0)
-               {
-                       delete[] _serializedBuf;
-                       _serializedBuf = NULL;
+       } else if (this->_eventType == ProvenanceEventRecord::SEND
+                       || this->_eventType == ProvenanceEventRecord::FETCH) {
+               ret = writeUTF(this->_transitUri,&outStream);
+               if (ret <= 0) {
+
                        return false;
                }
-       }
-       else if (this->_eventType == ProvenanceEventRecord::RECEIVE)
-       {
-               ret = writeUTF(this->_transitUri);
-               if (ret <= 0)
-               {
-                       delete[] _serializedBuf;
-                       _serializedBuf = NULL;
+       } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
+               ret = writeUTF(this->_transitUri,&outStream);
+               if (ret <= 0) {
+
                        return false;
                }
-               ret = writeUTF(this->_sourceSystemFlowFileIdentifier);
-               if (ret <= 0)
-               {
-                       delete[] _serializedBuf;
-                       _serializedBuf = NULL;
+               ret = 
writeUTF(this->_sourceSystemFlowFileIdentifier,&outStream);
+               if (ret <= 0) {
+
                        return false;
                }
        }
 
        // Persistent to the DB
-       if (repo->Put(_eventIdStr, _serializedBuf, _serializeBufSize))
-       {
-               _logger->log_debug("NiFi Provenance Store event %s size %d 
success", _eventIdStr.c_str(), _serializeBufSize);
-       }
-       else
-       {
-               _logger->log_error("NiFi Provenance Store event %s size %d 
fail", _eventIdStr.c_str(), _serializeBufSize);
+       if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), 
outStream.getSize())) {
+               _logger->log_debug("NiFi Provenance Store event %s size %d 
success",
+                               _eventIdStr.c_str(), outStream.getSize());
+       } else {
+               _logger->log_error("NiFi Provenance Store event %s size %d 
fail",
+                               _eventIdStr.c_str(), outStream.getSize());
        }
 
        // cleanup
-       delete[] (_serializedBuf);
-       _serializedBuf = NULL;
-       _serializeBufSize = 0;
 
        return true;
 }
 
-bool ProvenanceEventRecord::DeSerialize(uint8_t *buffer, int bufferSize)
-{
-       _serializedBuf = buffer;
-       _serializeBufSize = 0;
-       _maxSerializeBufSize = bufferSize;
+bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int 
bufferSize) {
 
        int ret;
 
-       ret = readUTF(this->_eventIdStr);
-       if (ret <= 0)
-       {
+       DataStream outStream(buffer,bufferSize);
+
+       ret = readUTF(this->_eventIdStr,&outStream);
+
+       if (ret <= 0) {
                return false;
        }
 
        uint32_t eventType;
-       ret = read(eventType);
-       if (ret != 4)
-       {
+       ret = read(eventType,&outStream);
+       if (ret != 4) {
                return false;
        }
        this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) 
eventType;
 
-       ret = read(this->_eventTime);
-       if (ret != 8)
-       {
+       ret = read(this->_eventTime,&outStream);
+       if (ret != 8) {
                return false;
        }
 
-       ret = read(this->_entryDate);
-       if (ret != 8)
-       {
+       ret = read(this->_entryDate,&outStream);
+       if (ret != 8) {
                return false;
        }
 
-       ret = read(this->_eventDuration);
-       if (ret != 8)
-       {
+       ret = read(this->_eventDuration,&outStream);
+       if (ret != 8) {
                return false;
        }
 
-       ret = read(this->_lineageStartDate);
-       if (ret != 8)
-       {
+       ret = read(this->_lineageStartDate,&outStream);
+       if (ret != 8) {
                return false;
        }
 
-       ret = readUTF(this->_componentId);
-       if (ret <= 0)
-       {
+       ret = readUTF(this->_componentId,&outStream);
+       if (ret <= 0) {
                return false;
        }
 
-       ret = readUTF(this->_componentType);
-       if (ret <= 0)
-       {
+       ret = readUTF(this->_componentType,&outStream);
+       if (ret <= 0) {
                return false;
        }
 
-       ret = readUTF(this->_uuid);
-       if (ret <= 0)
-       {
+       ret = readUTF(this->_uuid,&outStream);
+       if (ret <= 0) {
                return false;
        }
 
-       ret = readUTF(this->_details);
-       if (ret <= 0)
-       {
+       ret = readUTF(this->_details,&outStream);
+
+       if (ret <= 0) {
                return false;
        }
 
        // read flow attributes
        uint32_t numAttributes = 0;
-       ret = read(numAttributes);
-       if (ret != 4)
-       {
+       ret = read(numAttributes,&outStream);
+       if (ret != 4) {
                return false;
        }
 
-       for (uint32_t i = 0; i < numAttributes; i++)
-       {
+       for (uint32_t i = 0; i < numAttributes; i++) {
                std::string key;
-               ret = readUTF(key, true);
-               if (ret <= 0)
-               {
+               ret = readUTF(key,&outStream, true);
+               if (ret <= 0) {
                        return false;
                }
                std::string value;
-               ret = readUTF(value, true);
-               if (ret <= 0)
-               {
+               ret = readUTF(value,&outStream, true);
+               if (ret <= 0) {
                        return false;
                }
                this->_attributes[key] = value;
        }
 
-       ret = readUTF(this->_contentFullPath);
-       if (ret <= 0)
-       {
+       ret = readUTF(this->_contentFullPath,&outStream);
+       if (ret <= 0) {
                return false;
        }
 
-       ret = read(this->_size);
-       if (ret != 8)
-       {
+       ret = read(this->_size,&outStream);
+       if (ret != 8) {
                return false;
        }
 
-       ret = read(this->_offset);
-       if (ret != 8)
-       {
+       ret = read(this->_offset,&outStream);
+       if (ret != 8) {
                return false;
        }
 
-       ret = readUTF(this->_sourceQueueIdentifier);
-       if (ret <= 0)
-       {
+       ret = readUTF(this->_sourceQueueIdentifier,&outStream);
+       if (ret <= 0) {
                return false;
        }
 
-       if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType 
== ProvenanceEventRecord::CLONE || this->_eventType == 
ProvenanceEventRecord::JOIN)
-       {
+       if (this->_eventType == ProvenanceEventRecord::FORK
+                       || this->_eventType == ProvenanceEventRecord::CLONE
+                       || this->_eventType == ProvenanceEventRecord::JOIN) {
                // read UUIDs
                uint32_t number = 0;
-               ret = read(number);
-               if (ret != 4)
-               {
+               ret = read(number,&outStream);
+               if (ret != 4) {
                        return false;
                }
-               for (uint32_t i = 0; i < number; i++)
-               {
+
+
+               for (uint32_t i = 0; i < number; i++) {
                        std::string parentUUID;
-                       ret = readUTF(parentUUID);
-                       if (ret <= 0)
-                       {
+                       ret = readUTF(parentUUID,&outStream);
+                       if (ret <= 0) {
                                return false;
                        }
                        this->addParentUuid(parentUUID);
                }
                number = 0;
-               ret = read(number);
-               if (ret != 4)
-               {
+               ret = read(number,&outStream);
+               if (ret != 4) {
                        return false;
                }
-               for (uint32_t i = 0; i < number; i++)
-               {
+               for (uint32_t i = 0; i < number; i++) {
                        std::string childUUID;
-                       ret = readUTF(childUUID);
-                       if (ret <= 0)
-                       {
+                       ret = readUTF(childUUID,&outStream);
+                       if (ret <= 0) {
                                return false;
                        }
                        this->addChildUuid(childUUID);
                }
-       }
-       else if (this->_eventType == ProvenanceEventRecord::SEND || 
this->_eventType == ProvenanceEventRecord::FETCH)
-       {
-               ret = readUTF(this->_transitUri);
-               if (ret <= 0)
-               {
+       } else if (this->_eventType == ProvenanceEventRecord::SEND
+                       || this->_eventType == ProvenanceEventRecord::FETCH) {
+               ret = readUTF(this->_transitUri,&outStream);
+               if (ret <= 0) {
                        return false;
                }
-       }
-       else if (this->_eventType == ProvenanceEventRecord::RECEIVE)
-       {
-               ret = readUTF(this->_transitUri);
-               if (ret <= 0)
-               {
+       } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
+               ret = readUTF(this->_transitUri,&outStream);
+               if (ret <= 0) {
                        return false;
                }
-               ret = readUTF(this->_sourceSystemFlowFileIdentifier);
-               if (ret <= 0)
-               {
+               ret = readUTF(this->_sourceSystemFlowFileIdentifier,&outStream);
+               if (ret <= 0) {
                        return false;
                }
        }
@@ -666,35 +387,32 @@ bool ProvenanceEventRecord::DeSerialize(uint8_t *buffer, 
int bufferSize)
        return true;
 }
 
-void ProvenanceReporter::commit()
-{
-       for (std::set<ProvenanceEventRecord*>::iterator it = _events.begin(); 
it != _events.end(); ++it)
-       {
-               ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it);
-               if 
(!FlowController::getFlowController()->getProvenanceRepository()->isFull())
-                       
event->Serialize(FlowController::getFlowController()->getProvenanceRepository());
-               else
+void ProvenanceReporter::commit() {
+       for (auto event : _events) {
+               if 
(!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull())
 {
+                       event->Serialize(
+                                       
FlowControllerFactory::getFlowController()->getProvenanceRepository());
+               } else {
                        _logger->log_debug("Provenance Repository is full");
+               }
        }
 }
 
-void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail)
-{
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, 
flow);
+void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) {
+       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE,
+                       flow);
 
-       if (event)
-       {
+       if (event) {
                event->setDetails(detail);
                add(event);
        }
 }
 
-void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, 
std::string detail, uint64_t processingDuration)
-{
+void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation,
+               std::string detail, uint64_t processingDuration) {
        ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, 
flow);
 
-       if (event)
-       {
+       if (event) {
                event->setDetails(detail);
                event->setRelationship(relation.getName());
                event->setEventDuration(processingDuration);
@@ -702,51 +420,49 @@ void ProvenanceReporter::route(FlowFileRecord *flow, 
Relationship relation, std:
        }
 }
 
-void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow, std::string 
detail)
-{
-       ProvenanceEventRecord *event = 
allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
+void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow,
+               std::string detail) {
+       ProvenanceEventRecord *event = allocate(
+                       ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
 
-       if (event)
-       {
+       if (event) {
                event->setDetails(detail);
                add(event);
        }
 }
 
-void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string 
detail, uint64_t processingDuration)
-{
-       ProvenanceEventRecord *event = 
allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
+void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string 
detail,
+               uint64_t processingDuration) {
+       ProvenanceEventRecord *event = allocate(
+                       ProvenanceEventRecord::CONTENT_MODIFIED, flow);
 
-       if (event)
-       {
+       if (event) {
                event->setDetails(detail);
                event->setEventDuration(processingDuration);
                add(event);
        }
 }
 
-void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child)
-{
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, 
parent);
+void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) {
+       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE,
+                       parent);
 
-       if (event)
-       {
+       if (event) {
                event->addChildFlowFile(child);
                event->addParentFlowFile(parent);
                add(event);
        }
 }
 
-void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents, 
FlowFileRecord *child, std::string detail, uint64_t processingDuration)
-{
+void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents,
+               FlowFileRecord *child, std::string detail,
+               uint64_t processingDuration) {
        ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, 
child);
 
-       if (event)
-       {
+       if (event) {
                event->addChildFlowFile(child);
                std::vector<FlowFileRecord *>::iterator it;
-               for (it = parents.begin(); it!= parents.end(); it++)
-               {
+               for (it = parents.begin(); it != parents.end(); it++) {
                        FlowFileRecord *record = *it;
                        event->addParentFlowFile(record);
                }
@@ -756,16 +472,16 @@ void ProvenanceReporter::join(std::vector<FlowFileRecord 
*> parents, FlowFileRec
        }
 }
 
-void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child, 
FlowFileRecord *parent, std::string detail, uint64_t processingDuration)
-{
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, 
parent);
+void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child,
+               FlowFileRecord *parent, std::string detail,
+               uint64_t processingDuration) {
+       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK,
+                       parent);
 
-       if (event)
-       {
+       if (event) {
                event->addParentFlowFile(parent);
                std::vector<FlowFileRecord *>::iterator it;
-               for (it = child.begin(); it!= child.end(); it++)
-               {
+               for (it = child.begin(); it != child.end(); it++) {
                        FlowFileRecord *record = *it;
                        event->addChildFlowFile(record);
                }
@@ -775,71 +491,66 @@ void ProvenanceReporter::fork(std::vector<FlowFileRecord 
*> child, FlowFileRecor
        }
 }
 
-void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail)
-{
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, 
flow);
+void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) {
+       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE,
+                       flow);
 
-       if (event)
-       {
+       if (event) {
                event->setDetails(detail);
                add(event);
        }
 }
 
-void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason)
-{
+void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) {
        ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, 
flow);
 
-       if (event)
-       {
+       if (event) {
                std::string dropReason = "Discard reason: " + reason;
                event->setDetails(dropReason);
                add(event);
        }
 }
 
-void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri, 
std::string detail, uint64_t processingDuration, bool force)
-{
+void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri,
+               std::string detail, uint64_t processingDuration, bool force) {
        ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, 
flow);
 
-       if (event)
-       {
+       if (event) {
                event->setTransitUri(transitUri);
                event->setDetails(detail);
                event->setEventDuration(processingDuration);
-               if (!force)
-               {
+               if (!force) {
                        add(event);
-               }
-               else
-               {
-                       if 
(!FlowController::getFlowController()->getProvenanceRepository()->isFull())
-                               
event->Serialize(FlowController::getFlowController()->getProvenanceRepository());
+               } else {
+                       if 
(!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull())
+                               event->Serialize(
+                                               
FlowControllerFactory::getFlowController()->getProvenanceRepository());
                        delete event;
                }
        }
 }
 
-void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri, 
std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t 
processingDuration)
-{
-       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, 
flow);
+void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri,
+               std::string sourceSystemFlowFileIdentifier, std::string detail,
+               uint64_t processingDuration) {
+       ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE,
+                       flow);
 
-       if (event)
-       {
+       if (event) {
                event->setTransitUri(transitUri);
                event->setDetails(detail);
                event->setEventDuration(processingDuration);
-               
event->setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier);
+               event->setSourceSystemFlowFileIdentifier(
+                               sourceSystemFlowFileIdentifier);
                add(event);
        }
 }
 
-void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, 
std::string detail, uint64_t processingDuration)
-{
+void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri,
+               std::string detail, uint64_t processingDuration) {
        ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, 
flow);
 
-       if (event)
-       {
+       if (event) {
                event->setTransitUri(transitUri);
                event->setDetails(detail);
                event->setEventDuration(processingDuration);
@@ -849,8 +560,7 @@ void ProvenanceReporter::fetch(FlowFileRecord *flow, 
std::string transitUri, std
 
 uint64_t ProvenanceRepository::_repoSize = 0;
 
-void ProvenanceRepository::start()
-{
+void ProvenanceRepository::start() {
        if (this->_purgePeriod <= 0)
                return;
        if (_running)
@@ -861,48 +571,47 @@ void ProvenanceRepository::start()
        _thread->detach();
 }
 
-void ProvenanceRepository::stop()
-{
+void ProvenanceRepository::stop() {
        if (!_running)
                return;
        _running = false;
        _logger->log_info("ProvenanceRepository Monitor Thread Stop");
 }
 
-void ProvenanceRepository::run(ProvenanceRepository *repo)
-{
+void ProvenanceRepository::run(ProvenanceRepository *repo) {
        // threshold for purge
-       uint64_t purgeThreshold = repo->_maxPartitionBytes*3/4;
-       while (repo->_running)
-       {
-               
std::this_thread::sleep_for(std::chrono::milliseconds(repo->_purgePeriod));
+       uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
+       while (repo->_running) {
+               std::this_thread::sleep_for(
+                               std::chrono::milliseconds(repo->_purgePeriod));
                uint64_t curTime = getTimeMillis();
                uint64_t size = repo->repoSize();
-               if (size >= purgeThreshold)
-               {
+               if (size >= purgeThreshold) {
                        std::vector<std::string> purgeList;
-                       leveldb::Iterator* it = 
repo->_db->NewIterator(leveldb::ReadOptions());
-                       for (it->SeekToFirst(); it->Valid(); it->Next())
-                       {
+                       leveldb::Iterator* it = repo->_db->NewIterator(
+                                       leveldb::ReadOptions());
+                       for (it->SeekToFirst(); it->Valid(); it->Next()) {
                                ProvenanceEventRecord eventRead;
                                std::string key = it->key().ToString();
-                               if (eventRead.DeSerialize((uint8_t 
*)it->value().data(), (int) it->value().size()))
-                               {
-                                       if ((curTime - 
eventRead.getEventTime()) > repo->_maxPartitionMillis)
+                               if (eventRead.DeSerialize((uint8_t *) 
it->value().data(),
+                                               (int) it->value().size())) {
+                                       if ((curTime - eventRead.getEventTime())
+                                                       > 
repo->_maxPartitionMillis)
                                                purgeList.push_back(key);
-                               }
-                               else
-                               {
-                                       repo->_logger->log_debug("NiFi 
Provenance retrieve event %s fail", key.c_str());
+                               } else {
+                                       repo->_logger->log_debug(
+                                                       "NiFi Provenance 
retrieve event %s fail",
+                                                       key.c_str());
                                        purgeList.push_back(key);
                                }
                        }
                        delete it;
                        std::vector<std::string>::iterator itPurge;
-                       for (itPurge = purgeList.begin(); itPurge!= 
purgeList.end(); itPurge++)
-                       {
+                       for (itPurge = purgeList.begin(); itPurge != 
purgeList.end();
+                                       itPurge++) {
                                std::string eventId = *itPurge;
-                               repo->_logger->log_info("ProvenanceRepository 
Repo Purge %s", eventId.c_str());
+                               repo->_logger->log_info("ProvenanceRepository 
Repo Purge %s",
+                                               eventId.c_str());
                                repo->Delete(eventId);
                        }
                }
@@ -914,6 +623,3 @@ void ProvenanceRepository::run(ProvenanceRepository *repo)
        return;
 }
 
-
-
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp
index 3f209ce..e5328b9 100644
--- a/libminifi/src/PutFile.cpp
+++ b/libminifi/src/PutFile.cpp
@@ -136,11 +136,13 @@ bool PutFile::putFile(ProcessSession *session, 
FlowFileRecord *flowFile, const s
        if (cb.commit())
        {
                session->transfer(flowFile, Success);
+               return true;
        }
        else
        {
                session->transfer(flowFile, Failure);
        }
+       return false;
 }
 
 PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const 
std::string &destFile)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index 3c22ac9..a82f647 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -25,10 +25,14 @@
 
 std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
 
+
+std::string ResourceClaim::default_directory_path=DEFAULT_CONTENT_DIRECTORY;
+
 ResourceClaim::ResourceClaim(const std::string contentDirectory)
 : _id(_localResourceClaimNumber.load()),
   _flowFileRecordOwnedCount(0)
 {
+  
        char uuidStr[37];
 
        // Generate the global UUID for the resource claim

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Serializable.cpp b/libminifi/src/Serializable.cpp
new file mode 100644
index 0000000..91330a0
--- /dev/null
+++ b/libminifi/src/Serializable.cpp
@@ -0,0 +1,365 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <iostream>
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <arpa/inet.h>
+#include "Serializable.h"
+
+#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
+
+bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian();
+
+#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1)
+
+template<typename T>
+int Serializable::writeData(const T &t,DataStream *stream) {
+    uint8_t bytes[sizeof t];
+    std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
+              static_cast<const char*>(static_cast<const void*>(&t)) + sizeof 
t,
+              bytes);
+    return stream->writeData(bytes, sizeof t);
+}
+
+template<typename T>
+int Serializable::writeData(const T &t, uint8_t *to_vec) {
+    std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
+              static_cast<const char*>(static_cast<const void*>(&t)) + sizeof 
t,
+              to_vec);
+    return sizeof t;
+}
+
+template<typename T>
+int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
+    uint8_t bytes[sizeof t];
+    std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
+              static_cast<const char*>(static_cast<const void*>(&t)) + sizeof 
t,
+              bytes);
+    to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
+    return sizeof t;
+}
+
+
+
+
+
+int Serializable::write(uint8_t value,DataStream *stream) {
+    return stream->writeData(&value, 1);
+}
+int Serializable::write(char value,DataStream *stream) {
+    return stream->writeData((uint8_t *) &value, 1);
+}
+
+int Serializable::write(uint8_t *value, int len,DataStream *stream) {
+    return stream->writeData(value, len);
+}
+
+int Serializable::write(bool value) {
+    uint8_t temp = value;
+    return write(temp);
+}
+
+int Serializable::read(uint8_t &value,DataStream *stream) {
+    uint8_t buf;
+
+    int ret = stream->readData(&buf, 1);
+    if (ret == 1)
+        value = buf;
+    return ret;
+}
+
+int Serializable::read(char &value,DataStream *stream) {
+    uint8_t buf;
+
+    int ret = stream->readData(&buf, 1);
+    if (ret == 1)
+        value = (char) buf;
+    return ret;
+}
+
+int Serializable::read(uint8_t *value, int len,DataStream *stream) {
+    return stream->readData(value, len);
+}
+
+int Serializable::read(uint16_t &value,DataStream *stream, bool 
is_little_endian) {
+
+    return stream->readShort(value, is_little_endian);
+}
+
+int Serializable::read(uint32_t &value,DataStream *stream, bool 
is_little_endian) {
+
+    return stream->readLong(value, is_little_endian);
+
+}
+int Serializable::read(uint64_t &value,DataStream *stream, bool 
is_little_endian) {
+
+    return stream->readLongLong(value, is_little_endian);
+
+}
+
+int Serializable::write(uint32_t base_value,DataStream *stream, bool 
is_little_endian) {
+
+    const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
+
+    return writeData(value,stream);
+}
+
+int Serializable::write(uint64_t base_value,DataStream *stream, bool 
is_little_endian) {
+
+    const uint64_t value =
+        is_little_endian == 1 ? htonll_r(base_value) : base_value;
+    return writeData(value,stream);
+}
+
+int Serializable::write(uint16_t base_value,DataStream *stream, bool 
is_little_endian) {
+
+    const uint16_t value =
+        is_little_endian == 1 ? htons(base_value) : base_value;
+
+    return writeData(value,stream);
+}
+
+int Serializable::readUTF(std::string &str,DataStream *stream, bool widen) {
+    uint32_t utflen;
+    int ret = 1;
+
+    if (!widen) {
+        uint16_t shortLength = 0;
+        ret = read(shortLength,stream);
+        utflen = shortLength;
+
+        if (ret <= 0)
+            return ret;
+    } else {
+        uint32_t len;
+        ret = read(len,stream);
+        if (ret <= 0)
+            return ret;
+        utflen = len;
+    }
+
+    if (utflen == 0)
+        return 1;
+
+    std::vector<uint8_t> buf;
+    ret = stream->readData(buf, utflen);
+
+    // The number of chars produced may be less than utflen
+    str = std::string((const char*)&buf[0],utflen);
+
+    return utflen;
+    /*
+    if (!widen)
+        return (2 + utflen);
+    else
+        return (4 + utflen);
+        */
+}
+
+int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) {
+    int inLength = str.length();
+    uint32_t utflen = 0;
+    int currentPtr = 0;
+
+    /* use charAt instead of copying String to char array */
+    for (auto c : str) {
+        if (IS_ASCII(c)) {
+            utflen++;
+        }else if (c > 2047){
+               utflen += 3;
+        } else {
+            utflen += 2;
+        }
+    }
+
+    if (utflen > 65535)
+        return -1;
+
+    if (utflen == 0) {
+
+        if (!widen) {
+            uint16_t shortLen = utflen;
+            write(shortLen,stream);
+        } else {
+
+        }
+        return 1;
+    }
+
+    std::vector<uint8_t> utf_to_write;
+    if (!widen) {
+        utf_to_write.resize(utflen);
+
+        uint16_t shortLen = utflen;
+
+    } else {
+
+        utf_to_write.resize(utflen);
+
+    }
+
+    int i = 0;
+
+
+    uint8_t *underlyingPtr = &utf_to_write[0];
+    for (auto c : str) {
+        if (IS_ASCII(c)) {
+            writeData(c, underlyingPtr++);
+        } else if (c > 2047){
+
+               auto t = (uint8_t) (((c >> 0x0C) & 15) | 192);
+               writeData(t, underlyingPtr++);
+               t = (uint8_t) (((c >> 0x06) & 63) | 128);
+               writeData(t, underlyingPtr++);
+               t = (uint8_t) (((c >> 0) & 63) | 128);
+                       writeData(t, underlyingPtr++);
+
+        } else {
+            auto t = (uint8_t) (((c >> 0x06) & 31) | 192);
+            writeData(t, underlyingPtr++);
+            currentPtr++;
+            t = (uint8_t) (((c >> 0x00) & 63) | 128);
+            writeData(t, underlyingPtr++);
+            currentPtr++;
+
+        }
+    }
+    int ret;
+
+    if (!widen) {
+
+        uint16_t short_length = utflen;
+        write(short_length,stream);
+
+        for (int i = 0; i < utflen; i++) {
+        }
+        for (auto c : utf_to_write) {
+        }
+        ret = stream->writeData(utf_to_write.data(), utflen);
+    } else {
+        utflen += 4;
+        write(utflen,stream);
+        ret = stream->writeData(utf_to_write.data(), utflen);
+    }
+    return ret;
+}
+
+int DataStream::writeData(uint8_t *value, int size) {
+
+    /*if (buffer.size() + size < buffer.capacity())
+       {
+               buffer.resize( buffer.size() + size );
+       }
+       */
+    std::copy(value,value+size,std::back_inserter(buffer));
+
+    return size;
+}
+
+int DataStream::readLongLong(uint64_t &value, bool is_little_endian) {
+    if ((8 + readBuffer) > buffer.size()) {
+        // if read exceed
+        return -1;
+    }
+    uint8_t *buf = &buffer[readBuffer];
+
+    if (is_little_endian) {
+        value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
+                | ((uint64_t) (buf[2] & 255) << 40)
+                | ((uint64_t) (buf[3] & 255) << 32)
+                | ((uint64_t) (buf[4] & 255) << 24)
+                | ((uint64_t) (buf[5] & 255) << 16)
+                | ((uint64_t) (buf[6] & 255) << 8)
+                | ((uint64_t) (buf[7] & 255) << 0);
+    } else {
+        value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
+                | ((uint64_t) (buf[2] & 255) << 16)
+                | ((uint64_t) (buf[3] & 255) << 24)
+                | ((uint64_t) (buf[4] & 255) << 32)
+                | ((uint64_t) (buf[5] & 255) << 40)
+                | ((uint64_t) (buf[6] & 255) << 48)
+                | ((uint64_t) (buf[7] & 255) << 56);
+    }
+    readBuffer += 8;
+    return 8;
+}
+
+int DataStream::readLong(uint32_t &value, bool is_little_endian) {
+    if ((4 + readBuffer) > buffer.size()) {
+        // if read exceed
+        return -1;
+    }
+    uint8_t *buf = &buffer[readBuffer];
+
+    if (is_little_endian) {
+        value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+    } else {
+        value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+
+    }
+    readBuffer += 4;
+    return 4;
+}
+
+int DataStream::readShort(uint16_t &value, bool is_little_endian) {
+
+    if ((2 + readBuffer) > buffer.size()) {
+        // if read exceed
+        return -1;
+    }
+    uint8_t *buf = &buffer[readBuffer];
+
+    if (is_little_endian) {
+        value = (buf[0] << 8) | buf[1];
+    } else {
+        value = buf[0] | buf[1] << 8;
+
+    }
+    readBuffer += 2;
+    return 2;
+}
+
+int DataStream::readData(std::vector<uint8_t> &buf,int buflen) {
+    if ((buflen + readBuffer) > buffer.size()) {
+        // if read exceed
+        return -1;
+    }
+
+    if (buf.capacity() < buflen)
+       buf.resize(buflen);
+
+    buf.insert(buf.begin(),&buffer[readBuffer],&buffer[readBuffer+buflen]);
+
+    readBuffer += buflen;
+    return buflen;
+}
+
+
+int DataStream::readData(uint8_t *buf,int buflen) {
+    if ((buflen + readBuffer) > buffer.size()) {
+        // if read exceed
+        return -1;
+    }
+
+    std::copy(&buffer[readBuffer],&buffer[readBuffer+buflen],buf);
+
+    readBuffer += buflen;
+    return buflen;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp
index fb20767..3d6166b 100644
--- a/libminifi/src/Site2SitePeer.cpp
+++ b/libminifi/src/Site2SitePeer.cpp
@@ -138,7 +138,7 @@ bool Site2SitePeer::Open()
        }
 
        // OpenSSL init
-       SSL_CTX *ctx = FlowController::getFlowController()->getSSLContext();
+       SSL_CTX *ctx = 
FlowControllerFactory::getFlowController()->getSSLContext();
        if (ctx)
        {
                // we have s2s secure config

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/Server.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp
index f7bd3dd..65245f6 100644
--- a/libminifi/test/Server.cpp
+++ b/libminifi/test/Server.cpp
@@ -15,7 +15,6 @@
 #include <errno.h>
 #include <chrono>
 #include <thread>
-#include <iostream>     // std::cout
 #include <fstream>      // std::ifstream
 #include <signal.h>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index a0950e1..4e7e73f 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -18,8 +18,68 @@
 
 #ifndef LIBMINIFI_TEST_TESTBASE_H_
 #define LIBMINIFI_TEST_TESTBASE_H_
-
+#include <cstdio>
+#include <cstdlib>
+#include "ResourceClaim.h"
 #include "catch.hpp"
+#include "Logger.h"
+#include <vector>
+
+
+class LogTestController {
+public:
+       LogTestController(const std::string level = "debug") {
+               Logger::getLogger()->setLogLevel(level);
+       }
+
+
+       void enableDebug()
+       {
+               Logger::getLogger()->setLogLevel("debug");
+       }
+
+       ~LogTestController() {
+               Logger::getLogger()->setLogLevel(LOG_LEVEL_E::info);
+       }
+};
+
+class TestController{
+public:
+
+
+
+       TestController() : log("info")
+       {
+               ResourceClaim::default_directory_path = "./";
+       }
+
+       ~TestController()
+       {
+               for(auto dir : directories)
+               {
+                       rmdir(dir);
+               }
+       }
+
+       void enableDebug() {
+               log.enableDebug();
+       }
+
+       char *createTempDirectory(char *format)
+       {
+               char *dir = mkdtemp(format);
+               return dir;
+       }
+
+protected:
+       LogTestController log;
+       std::vector<char*> directories;
+
+
+};
+
+
+
 
 
 #endif /* LIBMINIFI_TEST_TESTBASE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/unit/ProcessorTests.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.h 
b/libminifi/test/unit/ProcessorTests.h
index 0cb6f65..0c17824 100644
--- a/libminifi/test/unit/ProcessorTests.h
+++ b/libminifi/test/unit/ProcessorTests.h
@@ -15,12 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <cstdlib>
 #include <uuid/uuid.h>
 #include <fstream>
+#include "FlowController.h"
+#include "ProvenanceTests.h"
 #include "../TestBase.h"
 #include "GetFile.h"
 
+#ifndef PROCESSOR_TESTS
+#define PROCESSOR_TESTS
 
 TEST_CASE("Test Creation of GetFile", "[getfileCreate]"){
        GetFile processor("processorname");
@@ -30,24 +33,33 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]"){
 
 TEST_CASE("Test Find file", "[getfileCreate2]"){
 
+       TestController testController;
+
+       testController.enableDebug();
+
+       ProvenanceTestRepository repo;
+       TestFlowController controller(repo);
+       FlowControllerFactory::getFlowController( 
dynamic_cast<FlowController*>(&controller));
 
        GetFile processor("getfileCreate2");
 
        char format[] ="/tmp/gt.XXXXXX";
-       char *dir = mkdtemp(format);
+       char *dir = testController.createTempDirectory(format);
+
 
+       uuid_t processoruuid;
+       REQUIRE( true == processor.getUUID(processoruuid) );
 
-       Connection connection("emptyConnection");
+       Connection connection("getfileCreate2Connection");
        connection.setRelationship(Relationship("success","description"));
 
        // link the connections so that we can test results at the end for this
 
        connection.setSourceProcessor(&processor);
 
-       uuid_t processoruuid;
-       uuid_parse(processor.getUUIDStr().c_str(),processoruuid);
 
        connection.setSourceProcessorUUID(processoruuid);
+       connection.setDestinationProcessorUUID(processoruuid);
 
        processor.addConnection(&connection);
        REQUIRE( dir != NULL );
@@ -64,15 +76,15 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
        processor.onTrigger(&context,&session);
 
        ProvenanceReporter *reporter = session.getProvenanceReporter();
-       std::set<ProvenanceEventRecord *> records = reporter->getEvents();
+       std::set<ProvenanceEventRecord*> records = reporter->getEvents();
 
-    record = session.get();
+       record = session.get();
        REQUIRE( record== 0 );
        REQUIRE( records.size() == 0 );
 
        std::fstream file;
        std::stringstream ss;
-       ss << dir << "/" << "tstFile";
+       ss << dir << "/" << "tstFile.ext";
        file.open(ss.str(),std::ios::out);
        file << "tempFile";
        file.close();
@@ -89,12 +101,54 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){
 
        for(ProvenanceEventRecord *provEventRecord : records)
        {
-
                REQUIRE (provEventRecord->getComponentType() == 
processor.getName());
        }
+       session.commit();
+
+       FlowFileRecord *ffr = session.get();
+
+       ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+
+       delete ffr;
+
+       std::set<FlowFileRecord*> expiredFlows;
+
+       REQUIRE( 2 == repo.getRepoMap().size() );
+
+       for(auto  entry: repo.getRepoMap())
+       {
+               ProvenanceEventRecord newRecord;
+               
newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length());
+
+               bool found = false;
+               for ( auto provRec : records)
+               {
+                       if (provRec->getEventId() == newRecord.getEventId() )
+                       {
+                               REQUIRE( provRec->getEventId() == 
newRecord.getEventId());
+                               REQUIRE( provRec->getComponentId() == 
newRecord.getComponentId());
+                               REQUIRE( provRec->getComponentType() == 
newRecord.getComponentType());
+                               REQUIRE( provRec->getDetails() == 
newRecord.getDetails());
+                               REQUIRE( provRec->getEventDuration() == 
newRecord.getEventDuration());
+                               found = true;
+                               break;
+                       }
+               }
+               if (!found)
+               throw std::runtime_error("Did not find record");
+
+
+       }
+
+
 
 
 
 
 }
 
+
+
+#endif
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index 2516ed9..f67a826 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -19,10 +19,17 @@
 #define LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_
 
 #include "Provenance.h"
+#include "FlowController.h"
 
+/**
+ * Test repository
+ */
 class ProvenanceTestRepository : public ProvenanceRepository
 {
 public:
+       ProvenanceTestRepository()
+{
+}
                //! initialize
                bool initialize()
                {
@@ -59,9 +66,72 @@ public:
                                return false;
                        }
                }
+
+               const std::map<std::string,std::string> &getRepoMap() const
+               {
+                       return repositoryResults;
+               }
+
 protected:
                std::map<std::string,std::string> repositoryResults;
 };
 
 
+class TestFlowController : public FlowController
+{
+
+public:
+       TestFlowController(ProvenanceTestRepository &repo) : ::FlowController()
+       {
+               _provenanceRepo = dynamic_cast<ProvenanceRepository*>(&repo);
+       }
+       ~TestFlowController()
+       {
+
+       }
+       void load(){
+
+       }
+
+       bool start()
+       {
+               _running.store(true);
+               return true;
+       }
+
+       void stop(bool force)
+       {
+               _running.store(false);
+       }
+       void waitUnload(const uint64_t timeToWaitMs)
+       {
+               stop(true);
+       }
+
+       void unload()
+       {
+               stop(true);
+       }
+
+       void reload(std::string file)
+       {
+
+       }
+
+       bool isRunning()
+       {
+               return true;
+       }
+
+
+       Processor *createProcessor(std::string name, uuid_t uuid){ return 0;}
+
+       ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid){ 
return 0;}
+
+       ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid){ 
return 0; }
+
+       Connection *createConnection(std::string name, uuid_t uuid){ return 0; }
+};
+
+
 #endif /* LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/unit/ProvenanceTests.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.h 
b/libminifi/test/unit/ProvenanceTests.h
index 63608df..d78de47 100644
--- a/libminifi/test/unit/ProvenanceTests.h
+++ b/libminifi/test/unit/ProvenanceTests.h
@@ -16,6 +16,9 @@
  * limitations under the License.
  */
 
+
+#ifndef PROVENANCE_TESTS
+#define PROVENANCE_TESTS
 #include "../TestBase.h"
 
 #include "ProvenanceTestHelper.h"
@@ -23,14 +26,71 @@
 #include "FlowFileRecord.h"
 
 
-TEST_CASE("Test Provenance record creation", "[TestProvenanceEventRecord]"){
-
-       ProvenanceEventRecord 
record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"blah","blahblah");
 
+TEST_CASE("Test Provenance record create", "[TestProvenanceEventRecord]"){
 
+       ProvenanceEventRecord 
record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"blah","blahblah");
        REQUIRE( record1.getAttributes().size() == 0);
        REQUIRE( record1.getAlternateIdentifierUri().length() == 0);
 
 }
 
 
+TEST_CASE("Test Provenance record serialization", 
"[TestProvenanceEventRecordSerializeDeser]"){
+
+       ProvenanceEventRecord 
record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"componentid","componenttype");
+
+       std::string eventId = record1.getEventId();
+       
+       std::string smileyface = ":)" ;
+       record1.setDetails(smileyface);
+
+       ProvenanceTestRepository repo;
+       uint64_t sample = 65555;
+       ProvenanceRepository *testRepository = 
dynamic_cast<ProvenanceRepository*>(&repo);
+       record1.setEventDuration(sample);
+
+       record1.Serialize(testRepository);
+       ProvenanceEventRecord record2;
+       REQUIRE( record2.DeSerialize(testRepository,eventId) == true);
+       REQUIRE( record2.getEventId() == record1.getEventId());
+       REQUIRE( record2.getComponentId() == record1.getComponentId());
+       REQUIRE( record2.getComponentType() == record1.getComponentType());
+       REQUIRE( record2.getDetails() == record1.getDetails());
+       REQUIRE( record2.getDetails() == smileyface);
+       REQUIRE( record2.getEventDuration() == sample);
+}
+
+
+TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]"){
+
+       ProvenanceEventRecord 
record1(ProvenanceEventRecord::ProvenanceEventType::CLONE,"componentid","componenttype");
+       std::string eventId = record1.getEventId();
+       std::map<std::string, std::string> attributes;
+       
attributes.insert(std::pair<std::string,std::string>("potato","potatoe"));
+       
attributes.insert(std::pair<std::string,std::string>("tomato","tomatoe"));
+       FlowFileRecord ffr1(attributes);
+
+       record1.addChildFlowFile(&ffr1);
+
+       ProvenanceTestRepository repo;
+       uint64_t sample = 65555;
+       ProvenanceRepository *testRepository = 
dynamic_cast<ProvenanceRepository*>(&repo);
+       record1.setEventDuration(sample);
+
+       record1.Serialize(testRepository);
+       ProvenanceEventRecord record2;
+       REQUIRE( record2.DeSerialize(testRepository,eventId) == true);
+       REQUIRE( record1.getChildrenUuids().size() == 1);
+       REQUIRE( record2.getChildrenUuids().size() == 1);
+       std::string childId = record2.getChildrenUuids().at(0);
+       REQUIRE( childId == ffr1.getUUIDStr());
+       record2.removeChildUuid(childId);
+       REQUIRE( record2.getChildrenUuids().size() == 0);
+
+
+}
+
+
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index ea916cd..9b99ee6 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -17,7 +17,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <fcntl.h>
 #include <stdio.h>
+#include <semaphore.h>
 #include <signal.h>
 #include <vector>
 #include <queue>
@@ -34,7 +36,7 @@
 //! Main thread sleep interval 1 second
 #define SLEEP_INTERVAL 1
 //! Main thread stop wait time
-#define STOP_WAIT_TIME 2
+#define STOP_WAIT_TIME_MS 30*1000
 //! Default YAML location
 #define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml"
 //! Default nifi properties file path
@@ -43,23 +45,32 @@
 #define MINIFI_HOME_ENV_KEY "MINIFI_HOME"
 
 /* Define Parser Values for Configuration YAML sections */
-#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
 #define CONFIG_YAML_PROCESSORS_KEY "Processors"
+#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
 #define CONFIG_YAML_CONNECTIONS_KEY "Connections"
 #define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups"
 
-//! Whether it is running
-static bool running = false;
+// Variables that allow us to avoid a timed wait.
+sem_t *running;
 //! Flow Controller
 static FlowController *controller = NULL;
 
+/**
+ * Removed the stop command from the signal handler so that we could trigger
+ * unload after we exit the semaphore controlled critical section in main.
+ *
+ * Semaphores are a portable choice when using signal handlers. Threads,
+ * mutexes, and condition variables are not guaranteed to work within
+ * a signal handler. Consequently we will use the semaphore to avoid thread
+ * safety issues and.
+ */
 void sigHandler(int signal)
 {
+
        if (signal == SIGINT || signal == SIGTERM)
        {
-               controller->stop(true);
-               sleep(STOP_WAIT_TIME);
-               running = false;
+               // avoid stopping the controller here.
+               sem_post(running);
        }
 }
 
@@ -68,6 +79,19 @@ int main(int argc, char **argv)
        Logger *logger = Logger::getLogger();
        logger->setLogLevel(info);
 
+
+       uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
+
+       std::string graceful_shutdown_seconds = "";
+       std::string configured_log_level = "";
+
+       running = sem_open("MiNiFiMain",O_CREAT,0644,0);
+       if (running == SEM_FAILED || running == 0)
+       {
+
+               logger->log_error("could not initialize semaphore");
+               perror("initialization failure");
+       }
     // assumes POSIX compliant environment
     std::string minifiHome;
     if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY))
@@ -85,6 +109,7 @@ int main(int argc, char **argv)
         minifiHome = minifiHomePath.substr(0, 
minifiHomePath.find_last_of("/\\"));     //Remove /bin from path
     }
 
+
        if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, 
sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR)
        {
                logger->log_error("Can not install signal handler");
@@ -95,25 +120,66 @@ int main(int argc, char **argv)
     configure->setHome(minifiHome);
     configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
 
-       controller = FlowController::getFlowController();
+
+    if 
(configure->get(Configure::nifi_graceful_shutdown_seconds,graceful_shutdown_seconds))
+    {
+       try
+       {
+               stop_wait_time = std::stoi(graceful_shutdown_seconds);
+       }
+       catch(const std::out_of_range &e)
+       {
+               logger->log_error("%s is out of range. 
%s",Configure::nifi_graceful_shutdown_seconds,e.what());
+       }
+       catch(const std::invalid_argument &e)
+       {
+               logger->log_error("%s contains an invalid argument set. 
%s",Configure::nifi_graceful_shutdown_seconds,e.what());
+       }
+    }
+    else
+    {
+       logger->log_debug("%s not set, defaulting to 
%d",Configure::nifi_graceful_shutdown_seconds,STOP_WAIT_TIME_MS);
+    }
+
+    if (configure->get(Configure::nifi_log_level,configured_log_level))
+       {
+               std::cout << "log level is " << configured_log_level << 
std::endl;
+                       logger->setLogLevel(configured_log_level);
+
+       }
+
+
+
+       controller = FlowControllerFactory::getFlowController();
 
        // Load flow from specified configuration file
        controller->load();
        // Start Processing the flow
-       controller->start();
-       running = true;
 
+       controller->start();
        logger->log_info("MiNiFi started");
 
-       // main loop
-       while (running)
-       {
-               sleep(SLEEP_INTERVAL);
-       }
+       /**
+        * Sem wait provides us the ability to have a controlled
+        * yield without the need for a more complex construct and
+        * a spin lock
+        */
+       if ( sem_wait(running) != -1 )
+               perror("sem_wait");
+
+
+       sem_unlink("MiNiFiMain");
+
+       /**
+        * Trigger unload -- wait stop_wait_time
+        */
+       controller->waitUnload(stop_wait_time);
 
-       controller->unload();
        delete controller;
+
        logger->log_info("MiNiFi exit");
 
+
+
        return 0;
 }

Reply via email to