Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 63dbb8241 -> 09d973baf
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp index 7c98278..b3cb8ac 100644 --- a/libminifi/src/ProcessGroup.cpp +++ b/libminifi/src/ProcessGroup.cpp @@ -135,9 +135,10 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, try { // Start all the processor node, input and output ports - for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + for(auto processor : _processors) { - Processor *processor(*it); + _logger->log_debug("Starting %s",processor->getName().c_str()); + if (!processor->isRunning() && processor->getScheduledState() != DISABLED) { if (processor->getSchedulingStrategy() == TIMER_DRIVEN) @@ -146,10 +147,9 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, eventScheduler->schedule(processor); } } - - for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + // Start processing the group + for(auto processGroup : _childProcessGroups) { - ProcessGroup *processGroup(*it); processGroup->startProcessing(timeScheduler, eventScheduler); } } @@ -202,12 +202,14 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Processor *ProcessGroup::findProcessor(uuid_t uuid) { + Processor *ret = NULL; // std::lock_guard<std::mutex> lock(_mtx); for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) { Processor *processor(*it); + _logger->log_info("find processor %s",processor->getName().c_str()); uuid_t processorUUID; if (processor->getUUID(processorUUID) && uuid_compare(processorUUID, uuid) == 0) return processor; @@ -215,7 +217,9 @@ Processor *ProcessGroup::findProcessor(uuid_t uuid) for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) { + ProcessGroup *processGroup(*it); + _logger->log_info("find processor child %s",processGroup->getName().c_str()); Processor *processor = processGroup->findProcessor(uuid); if (processor) return processor; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp index abe75bd..f3d769e 100644 --- a/libminifi/src/ProcessSession.cpp +++ b/libminifi/src/ProcessSession.cpp @@ -204,7 +204,7 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback) { ResourceClaim *claim = NULL; - claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + claim = new ResourceClaim(); try { @@ -382,7 +382,7 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS { ResourceClaim *claim = NULL; - claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + claim = new ResourceClaim(); char *buf = NULL; int size = 4096; buf = new char [size]; @@ -420,9 +420,10 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS } flow->_claim = claim; claim->increaseFlowFileRecordOwnedCount(); - /* + _logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + fs.close(); input.close(); if (!keepSource) @@ -478,10 +479,9 @@ void ProcessSession::commit() try { // First we clone the flow record based on the transfered relationship for updated flow record - std::map<std::string, FlowFileRecord *>::iterator it; - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + for (auto && it : _updatedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) continue; std::map<std::string, Relationship>::iterator itRelationship = @@ -537,11 +537,10 @@ void ProcessSession::commit() throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); } } - // Do the samething for added flow file - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + for(const auto it : _addedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) continue; std::map<std::string, Relationship>::iterator itRelationship = @@ -597,11 +596,10 @@ void ProcessSession::commit() throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); } } - // Complete process the added and update flow files for the session, send the flow file to its queue - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + for(const auto &it : _updatedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) { continue; @@ -611,9 +609,9 @@ void ProcessSession::commit() else delete record; } - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + for(const auto &it : _addedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) { continue; @@ -624,9 +622,9 @@ void ProcessSession::commit() delete record; } // Process the clone flow files - for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + for(const auto &it : _clonedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) { continue; @@ -637,15 +635,15 @@ void ProcessSession::commit() delete record; } // Delete the deleted flow files - for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++) + for(const auto &it : _deletedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } // Delete the snapshot - for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + for(const auto &it : _originalFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } // All done @@ -675,11 +673,10 @@ void ProcessSession::rollback() { try { - std::map<std::string, FlowFileRecord *>::iterator it; // Requeue the snapshot of the flowfile back - for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + for(const auto &it : _originalFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_orginalConnection) { record->_snapshot = false; @@ -690,21 +687,21 @@ void ProcessSession::rollback() } _originalFlowFiles.clear(); // Process the clone flow files - for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + for(const auto &it : _clonedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } _clonedFlowFiles.clear(); - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + for(const auto &it : _addedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } _addedFlowFiles.clear(); - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + for(const auto &it : _updatedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } _updatedFlowFiles.clear(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp index 6a11893..1b8e286 100644 --- a/libminifi/src/Processor.cpp +++ b/libminifi/src/Processor.cpp @@ -42,7 +42,7 @@ Processor::Processor(std::string name, uuid_t uuid) char uuidStr[37]; uuid_unparse(_uuid, uuidStr); _uuidStr = uuidStr; - + _hasWork.store(false); // Setup the default values _state = DISABLED; _strategy = TIMER_DRIVEN; @@ -57,7 +57,6 @@ Processor::Processor(std::string name, uuid_t uuid) _yieldExpiration = 0; _incomingConnectionsIter = this->_incomingConnections.begin(); _logger = Logger::getLogger(); - _logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str()); } @@ -83,9 +82,8 @@ bool Processor::setSupportedProperties(std::set<Property> properties) std::lock_guard<std::mutex> lock(_mtx); _properties.clear(); - for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it) + for (auto item : properties) { - Property item(*it); _properties[item.getName()] = item; _logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str()); } @@ -105,9 +103,8 @@ bool Processor::setSupportedRelationships(std::set<Relationship> relationships) std::lock_guard<std::mutex> lock(_mtx); _relationships.clear(); - for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) + for(auto item : relationships) { - Relationship item(*it); _relationships[item.getName()] = item; _logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str()); } @@ -127,9 +124,8 @@ bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationsh std::lock_guard<std::mutex> lock(_mtx); _autoTerminatedRelationships.clear(); - for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) + for(auto item : relationships) { - Relationship item(*it); _autoTerminatedRelationships[item.getName()] = item; _logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str()); } @@ -140,21 +136,18 @@ bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationsh bool Processor::isAutoTerminated(Relationship relationship) { bool isRun = isRunning(); + + auto conditionalLock = !isRun ? + std::unique_lock<std::mutex>() + : std::unique_lock<std::mutex>(_mtx); - if (!isRun) - _mtx.lock(); - - std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName()); + const auto &it = _autoTerminatedRelationships.find(relationship.getName()); if (it != _autoTerminatedRelationships.end()) { - if (!isRun) - _mtx.unlock(); return true; } else { - if (!isRun) - _mtx.unlock(); return false; } } @@ -163,20 +156,17 @@ bool Processor::isSupportedRelationship(Relationship relationship) { bool isRun = isRunning(); - if (!isRun) - _mtx.lock(); + auto conditionalLock = !isRun ? + std::unique_lock<std::mutex>() + : std::unique_lock<std::mutex>(_mtx); - std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName()); + const auto &it = _relationships.find(relationship.getName()); if (it != _relationships.end()) { - if (!isRun) - _mtx.unlock(); return true; } else { - if (!isRun) - _mtx.unlock(); return false; } } @@ -185,23 +175,20 @@ bool Processor::getProperty(std::string name, std::string &value) { bool isRun = isRunning(); - if (!isRun) - // Because set property only allowed in non running state, we need to obtain lock avoid rack condition - _mtx.lock(); - - std::map<std::string, Property>::iterator it = _properties.find(name); + + auto conditionalLock = !isRun ? + std::unique_lock<std::mutex>() + : std::unique_lock<std::mutex>(_mtx); + + const auto &it = _properties.find(name); if (it != _properties.end()) { Property item = it->second; value = item.getValue(); - if (!isRun) - _mtx.unlock(); return true; } else { - if (!isRun) - _mtx.unlock(); return false; } } @@ -210,7 +197,7 @@ bool Processor::setProperty(std::string name, std::string value) { std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string, Property>::iterator it = _properties.find(name); + auto &&it = _properties.find(name); if (it != _properties.end()) { @@ -254,7 +241,7 @@ std::set<Connection *> Processor::getOutGoingConnections(std::string relationshi { std::set<Connection *> empty; - std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship); + auto &&it = _outGoingConnections.find(relationship); if (it != _outGoingConnections.end()) { return _outGoingConnections[relationship]; @@ -269,6 +256,7 @@ bool Processor::addConnection(Connection *connection) { bool ret = false; + if (isRunning()) { _logger->log_info("Can not add connection while the process %s is running", @@ -276,6 +264,7 @@ bool Processor::addConnection(Connection *connection) return false; } + std::lock_guard<std::mutex> lock(_mtx); uuid_t srcUUID; @@ -283,8 +272,14 @@ bool Processor::addConnection(Connection *connection) connection->getSourceProcessorUUID(srcUUID); connection->getDestinationProcessorUUID(destUUID); + char uuid_str[37]; - if (uuid_compare(_uuid, destUUID) == 0) + + uuid_unparse_lower(_uuid, uuid_str); + std::string my_uuid = uuid_str; + uuid_unparse_lower(destUUID, uuid_str); + std::string destination_uuid = uuid_str; + if (my_uuid == destination_uuid) { // Connection is destination to the current processor if (_incomingConnections.find(connection) == _incomingConnections.end()) @@ -297,12 +292,13 @@ bool Processor::addConnection(Connection *connection) ret = true; } } - - if (uuid_compare(_uuid, srcUUID) == 0) + uuid_unparse_lower(srcUUID, uuid_str); + std::string source_uuid = uuid_str; + if (my_uuid == source_uuid) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor - std::map<std::string, std::set<Connection *>>::iterator it = + auto &&it = _outGoingConnections.find(relationship); if (it != _outGoingConnections.end()) { @@ -321,6 +317,7 @@ bool Processor::addConnection(Connection *connection) } else { + // We do not have any outgoing connection for this relationship yet std::set<Connection *> newConnection; newConnection.insert(connection); @@ -331,6 +328,7 @@ bool Processor::addConnection(Connection *connection) ret = true; } } + return ret; } @@ -369,7 +367,7 @@ void Processor::removeConnection(Connection *connection) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor - std::map<std::string, std::set<Connection *>>::iterator it = + auto &&it = _outGoingConnections.find(relationship); if (it == _outGoingConnections.end()) { @@ -414,9 +412,8 @@ bool Processor::flowFilesQueued() if (_incomingConnections.size() == 0) return false; - for (std::set<Connection *>::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it) + for(auto &&connection : _incomingConnections) { - Connection *connection = *it; if (connection->getQueueSize() > 0) return true; } @@ -428,15 +425,12 @@ bool Processor::flowFilesOutGoingFull() { std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string, std::set<Connection *>>::iterator it; - - for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it) + for(auto &&connection : _outGoingConnections) { // We already has connection for this relationship - std::set<Connection *> existedConnection = it->second; - for (std::set<Connection *>::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection) + std::set<Connection *> existedConnection = connection.second; + for(const auto connection : existedConnection) { - Connection *connection = *itConnection; if (connection->isFull()) return true; } @@ -476,15 +470,14 @@ void Processor::onTrigger() void Processor::waitForWork(uint64_t timeoutMs) { - std::unique_lock<std::mutex> lock(_workAvailableMtx); - _hasWork = isWorkAvailable(); + _hasWork.store( isWorkAvailable() ); - if (!_hasWork) + if (!_hasWork.load()) { - _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork; }); + std::unique_lock<std::mutex> lock(_workAvailableMtx); + _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork.load(); }); } - lock.unlock(); } void Processor::notifyWork() @@ -496,17 +489,12 @@ void Processor::notifyWork() } { - std::unique_lock<std::mutex> lock(_workAvailableMtx); - _hasWork = isWorkAvailable(); - - // Keep a scope-local copy of the state to avoid race conditions - bool hasWork = _hasWork; + _hasWork.store( isWorkAvailable() ); - lock.unlock(); - if (hasWork) + if (_hasWork.load()) { - _hasWorkCondition.notify_one(); + _hasWorkCondition.notify_one(); } } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp index a2a4310..c21de76 100644 --- a/libminifi/src/Provenance.cpp +++ b/libminifi/src/Provenance.cpp @@ -17,648 +17,369 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include <cstdint> +#include <vector> +#include <arpa/inet.h> +#include "Serializable.h" #include "Provenance.h" #include "Relationship.h" #include "Logger.h" #include "FlowController.h" -int ProvenanceEventRecord::readUTF(std::string &str, bool widen) -{ - uint16_t utflen; - int ret; - - if (!widen) - { - ret = read(utflen); - if (ret <= 0) - return ret; - } - else - { - uint32_t len; - ret = read(len); - if (ret <= 0) - return ret; - utflen = len; - } - - uint8_t *bytearr = NULL; - char *chararr = NULL; - bytearr = new uint8_t[utflen]; - chararr = new char[utflen]; - memset(chararr, 0, utflen); - - int c, char2, char3; - int count = 0; - int chararr_count=0; - - ret = read(bytearr, utflen); - if (ret <= 0) - { - delete[] bytearr; - delete[] chararr; - if (ret == 0) - { - if (!widen) - return (2 + utflen); - else - return (4 + utflen); - } - else - return ret; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - if (c > 127) break; - count++; - chararr[chararr_count++]=(char)c; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - switch (c >> 4) { - case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: - /* 0xxxxxxx*/ - count++; - chararr[chararr_count++]=(char)c; - break; - case 12: case 13: - /* 110x xxxx 10xx xxxx*/ - count += 2; - if (count > utflen) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - char2 = (int) bytearr[count-1]; - if ((char2 & 0xC0) != 0x80) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - chararr[chararr_count++]=(char)(((c & 0x1F) << 6) | - (char2 & 0x3F)); - break; - case 14: - /* 1110 xxxx 10xx xxxx 10xx xxxx */ - count += 3; - if (count > utflen) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - char2 = (int) bytearr[count-2]; - char3 = (int) bytearr[count-1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - chararr[chararr_count++]=(char)(((c & 0x0F) << 12) | - ((char2 & 0x3F) << 6) | - ((char3 & 0x3F) << 0)); - break; - default: - delete[] bytearr; - delete[] chararr; - return -1; - } - } - // The number of chars produced may be less than utflen - std::string value(chararr, chararr_count); - str = value; - delete[] bytearr; - delete[] chararr; - if (!widen) - return (2 + utflen); - else - return (4 + utflen); -} - -int ProvenanceEventRecord::writeUTF(std::string str, bool widen) -{ - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.at(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) - return -1; - - uint8_t *bytearr = NULL; - if (!widen) - { - bytearr = new uint8_t[utflen+2]; - bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); - } - else - { - bytearr = new uint8_t[utflen+4]; - bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); - } - - int i=0; - for (i=0; i<strlen; i++) { - c = str.at(i); - if (!((c >= 0x0001) && (c <= 0x007F))) break; - bytearr[count++] = (uint8_t) c; - } - - for (;i < strlen; i++){ - c = str.at(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (uint8_t) c; - } else if (c > 0x07FF) { - bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (uint8_t) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); - } - } - int ret; - if (!widen) - { - ret = writeData(bytearr, utflen+2); - } - else - { - ret = writeData(bytearr, utflen+4); - } - delete[] bytearr; - return ret; -} - //! DeSerialize -bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo, std::string key) -{ +bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo, + std::string key) { std::string value; bool ret; ret = repo->Get(key, value); - if (!ret) - { - _logger->log_error("NiFi Provenance Store event %s can not found", key.c_str()); + if (!ret) { + _logger->log_error("NiFi Provenance Store event %s can not found", + key.c_str()); return false; - } - else - _logger->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length()); + } else + _logger->log_debug("NiFi Provenance Read event %s length %d", + key.c_str(), value.length()); - ret = DeSerialize((unsigned char *) value.data(), value.length()); - if (ret) - { - _logger->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), _serializeBufSize, _eventType); - } - else - { - _logger->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), _serializeBufSize, _eventType); + DataStream stream((const uint8_t*)value.data(),value.length()); + + ret = DeSerialize(stream); + + if (ret) { + _logger->log_debug( + "NiFi Provenance retrieve event %s size %d eventType %d success", + _eventIdStr.c_str(), stream.getSize(), _eventType); + } else { + _logger->log_debug( + "NiFi Provenance retrieve event %s size %d eventType %d fail", + _eventIdStr.c_str(), stream.getSize(), _eventType); } return ret; } -bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) -{ - if (_serializedBuf) - // Serialize in progress - return false; - _serializedBuf = NULL; - _serializeBufSize = 0; - _maxSerializeBufSize = 0; - _serializedBuf = new uint8_t[PROVENANCE_EVENT_RECORD_SEG_SIZE]; - if (!_serializedBuf) - return false; - _maxSerializeBufSize = PROVENANCE_EVENT_RECORD_SEG_SIZE; +bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) { + + DataStream outStream; int ret; - ret = writeUTF(this->_eventIdStr); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_eventIdStr,&outStream); + if (ret <= 0) { + return false; } uint32_t eventType = this->_eventType; - ret = write(eventType); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(eventType,&outStream); + if (ret != 4) { + return false; } - ret = write(this->_eventTime); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_eventTime,&outStream); + if (ret != 8) { + return false; } - ret = write(this->_entryDate); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_entryDate,&outStream); + if (ret != 8) { return false; } - ret = write(this->_eventDuration); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_eventDuration,&outStream); + if (ret != 8) { + return false; } - ret = write(this->_lineageStartDate); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_lineageStartDate,&outStream); + if (ret != 8) { + return false; } - ret = writeUTF(this->_componentId); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_componentId,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_componentType); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_componentType,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_uuid); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_uuid,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_details); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_details,&outStream); + if (ret <= 0) { + return false; } // write flow attributes uint32_t numAttributes = this->_attributes.size(); - ret = write(numAttributes); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(numAttributes,&outStream); + if (ret != 4) { + return false; } - std::map<std::string, std::string>::iterator itAttribute; - for (itAttribute = this->_attributes.begin(); itAttribute!= this->_attributes.end(); itAttribute++) - { - ret = writeUTF(itAttribute->first, true); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + for (auto itAttribute : _attributes) { + ret = writeUTF(itAttribute.first,&outStream, true); + if (ret <= 0) { + return false; } - ret = writeUTF(itAttribute->second, true); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(itAttribute.second,&outStream, true); + if (ret <= 0) { + return false; } } - ret = writeUTF(this->_contentFullPath); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_contentFullPath,&outStream); + if (ret <= 0) { + return false; } - ret = write(this->_size); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_size,&outStream); + if (ret != 8) { + return false; } - ret = write(this->_offset); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_offset,&outStream); + if (ret != 8) { + return false; } - ret = writeUTF(this->_sourceQueueIdentifier); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_sourceQueueIdentifier,&outStream); + if (ret <= 0) { + return false; } - if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) - { + if (this->_eventType == ProvenanceEventRecord::FORK + || this->_eventType == ProvenanceEventRecord::CLONE + || this->_eventType == ProvenanceEventRecord::JOIN) { // write UUIDs uint32_t number = this->_parentUuids.size(); - ret = write(number); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(number,&outStream); + if (ret != 4) { + return false; } - std::vector<std::string>::iterator it; - for (it = this->_parentUuids.begin(); it!= this->_parentUuids.end(); it++) - { - std::string parentUUID = *it; - ret = writeUTF(parentUUID); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + for (auto parentUUID : _parentUuids) { + ret = writeUTF(parentUUID,&outStream); + if (ret <= 0) { + return false; } } number = this->_childrenUuids.size(); - ret = write(number); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(number,&outStream); + if (ret != 4) { return false; } - for (it = this->_childrenUuids.begin(); it!= this->_childrenUuids.end(); it++) - { - std::string childUUID = *it; - ret = writeUTF(childUUID); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + for (auto childUUID : _childrenUuids) { + ret = writeUTF(childUUID,&outStream); + if (ret <= 0) { + return false; } } - } - else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) - { - ret = writeUTF(this->_transitUri); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + } else if (this->_eventType == ProvenanceEventRecord::SEND + || this->_eventType == ProvenanceEventRecord::FETCH) { + ret = writeUTF(this->_transitUri,&outStream); + if (ret <= 0) { + return false; } - } - else if (this->_eventType == ProvenanceEventRecord::RECEIVE) - { - ret = writeUTF(this->_transitUri); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + ret = writeUTF(this->_transitUri,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_sourceSystemFlowFileIdentifier); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_sourceSystemFlowFileIdentifier,&outStream); + if (ret <= 0) { + return false; } } // Persistent to the DB - if (repo->Put(_eventIdStr, _serializedBuf, _serializeBufSize)) - { - _logger->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), _serializeBufSize); - } - else - { - _logger->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), _serializeBufSize); + if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { + _logger->log_debug("NiFi Provenance Store event %s size %d success", + _eventIdStr.c_str(), outStream.getSize()); + } else { + _logger->log_error("NiFi Provenance Store event %s size %d fail", + _eventIdStr.c_str(), outStream.getSize()); } // cleanup - delete[] (_serializedBuf); - _serializedBuf = NULL; - _serializeBufSize = 0; return true; } -bool ProvenanceEventRecord::DeSerialize(uint8_t *buffer, int bufferSize) -{ - _serializedBuf = buffer; - _serializeBufSize = 0; - _maxSerializeBufSize = bufferSize; +bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { int ret; - ret = readUTF(this->_eventIdStr); - if (ret <= 0) - { + DataStream outStream(buffer,bufferSize); + + ret = readUTF(this->_eventIdStr,&outStream); + + if (ret <= 0) { return false; } uint32_t eventType; - ret = read(eventType); - if (ret != 4) - { + ret = read(eventType,&outStream); + if (ret != 4) { return false; } this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType; - ret = read(this->_eventTime); - if (ret != 8) - { + ret = read(this->_eventTime,&outStream); + if (ret != 8) { return false; } - ret = read(this->_entryDate); - if (ret != 8) - { + ret = read(this->_entryDate,&outStream); + if (ret != 8) { return false; } - ret = read(this->_eventDuration); - if (ret != 8) - { + ret = read(this->_eventDuration,&outStream); + if (ret != 8) { return false; } - ret = read(this->_lineageStartDate); - if (ret != 8) - { + ret = read(this->_lineageStartDate,&outStream); + if (ret != 8) { return false; } - ret = readUTF(this->_componentId); - if (ret <= 0) - { + ret = readUTF(this->_componentId,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_componentType); - if (ret <= 0) - { + ret = readUTF(this->_componentType,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_uuid); - if (ret <= 0) - { + ret = readUTF(this->_uuid,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_details); - if (ret <= 0) - { + ret = readUTF(this->_details,&outStream); + + if (ret <= 0) { return false; } // read flow attributes uint32_t numAttributes = 0; - ret = read(numAttributes); - if (ret != 4) - { + ret = read(numAttributes,&outStream); + if (ret != 4) { return false; } - for (uint32_t i = 0; i < numAttributes; i++) - { + for (uint32_t i = 0; i < numAttributes; i++) { std::string key; - ret = readUTF(key, true); - if (ret <= 0) - { + ret = readUTF(key,&outStream, true); + if (ret <= 0) { return false; } std::string value; - ret = readUTF(value, true); - if (ret <= 0) - { + ret = readUTF(value,&outStream, true); + if (ret <= 0) { return false; } this->_attributes[key] = value; } - ret = readUTF(this->_contentFullPath); - if (ret <= 0) - { + ret = readUTF(this->_contentFullPath,&outStream); + if (ret <= 0) { return false; } - ret = read(this->_size); - if (ret != 8) - { + ret = read(this->_size,&outStream); + if (ret != 8) { return false; } - ret = read(this->_offset); - if (ret != 8) - { + ret = read(this->_offset,&outStream); + if (ret != 8) { return false; } - ret = readUTF(this->_sourceQueueIdentifier); - if (ret <= 0) - { + ret = readUTF(this->_sourceQueueIdentifier,&outStream); + if (ret <= 0) { return false; } - if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) - { + if (this->_eventType == ProvenanceEventRecord::FORK + || this->_eventType == ProvenanceEventRecord::CLONE + || this->_eventType == ProvenanceEventRecord::JOIN) { // read UUIDs uint32_t number = 0; - ret = read(number); - if (ret != 4) - { + ret = read(number,&outStream); + if (ret != 4) { return false; } - for (uint32_t i = 0; i < number; i++) - { + + + for (uint32_t i = 0; i < number; i++) { std::string parentUUID; - ret = readUTF(parentUUID); - if (ret <= 0) - { + ret = readUTF(parentUUID,&outStream); + if (ret <= 0) { return false; } this->addParentUuid(parentUUID); } number = 0; - ret = read(number); - if (ret != 4) - { + ret = read(number,&outStream); + if (ret != 4) { return false; } - for (uint32_t i = 0; i < number; i++) - { + for (uint32_t i = 0; i < number; i++) { std::string childUUID; - ret = readUTF(childUUID); - if (ret <= 0) - { + ret = readUTF(childUUID,&outStream); + if (ret <= 0) { return false; } this->addChildUuid(childUUID); } - } - else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) - { - ret = readUTF(this->_transitUri); - if (ret <= 0) - { + } else if (this->_eventType == ProvenanceEventRecord::SEND + || this->_eventType == ProvenanceEventRecord::FETCH) { + ret = readUTF(this->_transitUri,&outStream); + if (ret <= 0) { return false; } - } - else if (this->_eventType == ProvenanceEventRecord::RECEIVE) - { - ret = readUTF(this->_transitUri); - if (ret <= 0) - { + } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + ret = readUTF(this->_transitUri,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_sourceSystemFlowFileIdentifier); - if (ret <= 0) - { + ret = readUTF(this->_sourceSystemFlowFileIdentifier,&outStream); + if (ret <= 0) { return false; } } @@ -666,35 +387,32 @@ bool ProvenanceEventRecord::DeSerialize(uint8_t *buffer, int bufferSize) return true; } -void ProvenanceReporter::commit() -{ - for (std::set<ProvenanceEventRecord*>::iterator it = _events.begin(); it != _events.end(); ++it) - { - ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it); - if (!FlowController::getFlowController()->getProvenanceRepository()->isFull()) - event->Serialize(FlowController::getFlowController()->getProvenanceRepository()); - else +void ProvenanceReporter::commit() { + for (auto event : _events) { + if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) { + event->Serialize( + FlowControllerFactory::getFlowController()->getProvenanceRepository()); + } else { _logger->log_debug("Provenance Repository is full"); + } } } -void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, flow); +void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, + flow); - if (event) - { + if (event) { event->setDetails(detail); add(event); } } -void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, std::string detail, uint64_t processingDuration) -{ +void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, + std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow); - if (event) - { + if (event) { event->setDetails(detail); event->setRelationship(relation.getName()); event->setEventDuration(processingDuration); @@ -702,51 +420,49 @@ void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, std: } } -void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow, std::string detail) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); +void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow, + std::string detail) { + ProvenanceEventRecord *event = allocate( + ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); - if (event) - { + if (event) { event->setDetails(detail); add(event); } } -void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string detail, uint64_t processingDuration) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow); +void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate( + ProvenanceEventRecord::CONTENT_MODIFIED, flow); - if (event) - { + if (event) { event->setDetails(detail); event->setEventDuration(processingDuration); add(event); } } -void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, parent); +void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, + parent); - if (event) - { + if (event) { event->addChildFlowFile(child); event->addParentFlowFile(parent); add(event); } } -void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents, FlowFileRecord *child, std::string detail, uint64_t processingDuration) -{ +void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents, + FlowFileRecord *child, std::string detail, + uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child); - if (event) - { + if (event) { event->addChildFlowFile(child); std::vector<FlowFileRecord *>::iterator it; - for (it = parents.begin(); it!= parents.end(); it++) - { + for (it = parents.begin(); it != parents.end(); it++) { FlowFileRecord *record = *it; event->addParentFlowFile(record); } @@ -756,16 +472,16 @@ void ProvenanceReporter::join(std::vector<FlowFileRecord *> parents, FlowFileRec } } -void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child, FlowFileRecord *parent, std::string detail, uint64_t processingDuration) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, parent); +void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child, + FlowFileRecord *parent, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, + parent); - if (event) - { + if (event) { event->addParentFlowFile(parent); std::vector<FlowFileRecord *>::iterator it; - for (it = child.begin(); it!= child.end(); it++) - { + for (it = child.begin(); it != child.end(); it++) { FlowFileRecord *record = *it; event->addChildFlowFile(record); } @@ -775,71 +491,66 @@ void ProvenanceReporter::fork(std::vector<FlowFileRecord *> child, FlowFileRecor } } -void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, flow); +void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, + flow); - if (event) - { + if (event) { event->setDetails(detail); add(event); } } -void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) -{ +void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow); - if (event) - { + if (event) { std::string dropReason = "Discard reason: " + reason; event->setDetails(dropReason); add(event); } } -void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force) -{ +void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri, + std::string detail, uint64_t processingDuration, bool force) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow); - if (event) - { + if (event) { event->setTransitUri(transitUri); event->setDetails(detail); event->setEventDuration(processingDuration); - if (!force) - { + if (!force) { add(event); - } - else - { - if (!FlowController::getFlowController()->getProvenanceRepository()->isFull()) - event->Serialize(FlowController::getFlowController()->getProvenanceRepository()); + } else { + if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) + event->Serialize( + FlowControllerFactory::getFlowController()->getProvenanceRepository()); delete event; } } } -void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, flow); +void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri, + std::string sourceSystemFlowFileIdentifier, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, + flow); - if (event) - { + if (event) { event->setTransitUri(transitUri); event->setDetails(detail); event->setEventDuration(processingDuration); - event->setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier); + event->setSourceSystemFlowFileIdentifier( + sourceSystemFlowFileIdentifier); add(event); } } -void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration) -{ +void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, + std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow); - if (event) - { + if (event) { event->setTransitUri(transitUri); event->setDetails(detail); event->setEventDuration(processingDuration); @@ -849,8 +560,7 @@ void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, std uint64_t ProvenanceRepository::_repoSize = 0; -void ProvenanceRepository::start() -{ +void ProvenanceRepository::start() { if (this->_purgePeriod <= 0) return; if (_running) @@ -861,48 +571,47 @@ void ProvenanceRepository::start() _thread->detach(); } -void ProvenanceRepository::stop() -{ +void ProvenanceRepository::stop() { if (!_running) return; _running = false; _logger->log_info("ProvenanceRepository Monitor Thread Stop"); } -void ProvenanceRepository::run(ProvenanceRepository *repo) -{ +void ProvenanceRepository::run(ProvenanceRepository *repo) { // threshold for purge - uint64_t purgeThreshold = repo->_maxPartitionBytes*3/4; - while (repo->_running) - { - std::this_thread::sleep_for(std::chrono::milliseconds(repo->_purgePeriod)); + uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4; + while (repo->_running) { + std::this_thread::sleep_for( + std::chrono::milliseconds(repo->_purgePeriod)); uint64_t curTime = getTimeMillis(); uint64_t size = repo->repoSize(); - if (size >= purgeThreshold) - { + if (size >= purgeThreshold) { std::vector<std::string> purgeList; - leveldb::Iterator* it = repo->_db->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) - { + leveldb::Iterator* it = repo->_db->NewIterator( + leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { ProvenanceEventRecord eventRead; std::string key = it->key().ToString(); - if (eventRead.DeSerialize((uint8_t *)it->value().data(), (int) it->value().size())) - { - if ((curTime - eventRead.getEventTime()) > repo->_maxPartitionMillis) + if (eventRead.DeSerialize((uint8_t *) it->value().data(), + (int) it->value().size())) { + if ((curTime - eventRead.getEventTime()) + > repo->_maxPartitionMillis) purgeList.push_back(key); - } - else - { - repo->_logger->log_debug("NiFi Provenance retrieve event %s fail", key.c_str()); + } else { + repo->_logger->log_debug( + "NiFi Provenance retrieve event %s fail", + key.c_str()); purgeList.push_back(key); } } delete it; std::vector<std::string>::iterator itPurge; - for (itPurge = purgeList.begin(); itPurge!= purgeList.end(); itPurge++) - { + for (itPurge = purgeList.begin(); itPurge != purgeList.end(); + itPurge++) { std::string eventId = *itPurge; - repo->_logger->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str()); + repo->_logger->log_info("ProvenanceRepository Repo Purge %s", + eventId.c_str()); repo->Delete(eventId); } } @@ -914,6 +623,3 @@ void ProvenanceRepository::run(ProvenanceRepository *repo) return; } - - - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp index 3f209ce..e5328b9 100644 --- a/libminifi/src/PutFile.cpp +++ b/libminifi/src/PutFile.cpp @@ -136,11 +136,13 @@ bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const s if (cb.commit()) { session->transfer(flowFile, Success); + return true; } else { session->transfer(flowFile, Failure); } + return false; } PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp index 3c22ac9..a82f647 100644 --- a/libminifi/src/ResourceClaim.cpp +++ b/libminifi/src/ResourceClaim.cpp @@ -25,10 +25,14 @@ std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0); + +std::string ResourceClaim::default_directory_path=DEFAULT_CONTENT_DIRECTORY; + ResourceClaim::ResourceClaim(const std::string contentDirectory) : _id(_localResourceClaimNumber.load()), _flowFileRecordOwnedCount(0) { + char uuidStr[37]; // Generate the global UUID for the resource claim http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Serializable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Serializable.cpp b/libminifi/src/Serializable.cpp new file mode 100644 index 0000000..91330a0 --- /dev/null +++ b/libminifi/src/Serializable.cpp @@ -0,0 +1,365 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <iostream> +#include <cstdint> +#include <cstdio> +#include <cstring> +#include <arpa/inet.h> +#include "Serializable.h" + +#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32)) + +bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian(); + +#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1) + +template<typename T> +int Serializable::writeData(const T &t,DataStream *stream) { + uint8_t bytes[sizeof t]; + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), + static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, + bytes); + return stream->writeData(bytes, sizeof t); +} + +template<typename T> +int Serializable::writeData(const T &t, uint8_t *to_vec) { + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), + static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, + to_vec); + return sizeof t; +} + +template<typename T> +int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) { + uint8_t bytes[sizeof t]; + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), + static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, + bytes); + to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]); + return sizeof t; +} + + + + + +int Serializable::write(uint8_t value,DataStream *stream) { + return stream->writeData(&value, 1); +} +int Serializable::write(char value,DataStream *stream) { + return stream->writeData((uint8_t *) &value, 1); +} + +int Serializable::write(uint8_t *value, int len,DataStream *stream) { + return stream->writeData(value, len); +} + +int Serializable::write(bool value) { + uint8_t temp = value; + return write(temp); +} + +int Serializable::read(uint8_t &value,DataStream *stream) { + uint8_t buf; + + int ret = stream->readData(&buf, 1); + if (ret == 1) + value = buf; + return ret; +} + +int Serializable::read(char &value,DataStream *stream) { + uint8_t buf; + + int ret = stream->readData(&buf, 1); + if (ret == 1) + value = (char) buf; + return ret; +} + +int Serializable::read(uint8_t *value, int len,DataStream *stream) { + return stream->readData(value, len); +} + +int Serializable::read(uint16_t &value,DataStream *stream, bool is_little_endian) { + + return stream->readShort(value, is_little_endian); +} + +int Serializable::read(uint32_t &value,DataStream *stream, bool is_little_endian) { + + return stream->readLong(value, is_little_endian); + +} +int Serializable::read(uint64_t &value,DataStream *stream, bool is_little_endian) { + + return stream->readLongLong(value, is_little_endian); + +} + +int Serializable::write(uint32_t base_value,DataStream *stream, bool is_little_endian) { + + const uint32_t value = is_little_endian ? htonl(base_value) : base_value; + + return writeData(value,stream); +} + +int Serializable::write(uint64_t base_value,DataStream *stream, bool is_little_endian) { + + const uint64_t value = + is_little_endian == 1 ? htonll_r(base_value) : base_value; + return writeData(value,stream); +} + +int Serializable::write(uint16_t base_value,DataStream *stream, bool is_little_endian) { + + const uint16_t value = + is_little_endian == 1 ? htons(base_value) : base_value; + + return writeData(value,stream); +} + +int Serializable::readUTF(std::string &str,DataStream *stream, bool widen) { + uint32_t utflen; + int ret = 1; + + if (!widen) { + uint16_t shortLength = 0; + ret = read(shortLength,stream); + utflen = shortLength; + + if (ret <= 0) + return ret; + } else { + uint32_t len; + ret = read(len,stream); + if (ret <= 0) + return ret; + utflen = len; + } + + if (utflen == 0) + return 1; + + std::vector<uint8_t> buf; + ret = stream->readData(buf, utflen); + + // The number of chars produced may be less than utflen + str = std::string((const char*)&buf[0],utflen); + + return utflen; + /* + if (!widen) + return (2 + utflen); + else + return (4 + utflen); + */ +} + +int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) { + int inLength = str.length(); + uint32_t utflen = 0; + int currentPtr = 0; + + /* use charAt instead of copying String to char array */ + for (auto c : str) { + if (IS_ASCII(c)) { + utflen++; + }else if (c > 2047){ + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) + return -1; + + if (utflen == 0) { + + if (!widen) { + uint16_t shortLen = utflen; + write(shortLen,stream); + } else { + + } + return 1; + } + + std::vector<uint8_t> utf_to_write; + if (!widen) { + utf_to_write.resize(utflen); + + uint16_t shortLen = utflen; + + } else { + + utf_to_write.resize(utflen); + + } + + int i = 0; + + + uint8_t *underlyingPtr = &utf_to_write[0]; + for (auto c : str) { + if (IS_ASCII(c)) { + writeData(c, underlyingPtr++); + } else if (c > 2047){ + + auto t = (uint8_t) (((c >> 0x0C) & 15) | 192); + writeData(t, underlyingPtr++); + t = (uint8_t) (((c >> 0x06) & 63) | 128); + writeData(t, underlyingPtr++); + t = (uint8_t) (((c >> 0) & 63) | 128); + writeData(t, underlyingPtr++); + + } else { + auto t = (uint8_t) (((c >> 0x06) & 31) | 192); + writeData(t, underlyingPtr++); + currentPtr++; + t = (uint8_t) (((c >> 0x00) & 63) | 128); + writeData(t, underlyingPtr++); + currentPtr++; + + } + } + int ret; + + if (!widen) { + + uint16_t short_length = utflen; + write(short_length,stream); + + for (int i = 0; i < utflen; i++) { + } + for (auto c : utf_to_write) { + } + ret = stream->writeData(utf_to_write.data(), utflen); + } else { + utflen += 4; + write(utflen,stream); + ret = stream->writeData(utf_to_write.data(), utflen); + } + return ret; +} + +int DataStream::writeData(uint8_t *value, int size) { + + /*if (buffer.size() + size < buffer.capacity()) + { + buffer.resize( buffer.size() + size ); + } + */ + std::copy(value,value+size,std::back_inserter(buffer)); + + return size; +} + +int DataStream::readLongLong(uint64_t &value, bool is_little_endian) { + if ((8 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) + | ((uint64_t) (buf[2] & 255) << 40) + | ((uint64_t) (buf[3] & 255) << 32) + | ((uint64_t) (buf[4] & 255) << 24) + | ((uint64_t) (buf[5] & 255) << 16) + | ((uint64_t) (buf[6] & 255) << 8) + | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) + | ((uint64_t) (buf[2] & 255) << 16) + | ((uint64_t) (buf[3] & 255) << 24) + | ((uint64_t) (buf[4] & 255) << 32) + | ((uint64_t) (buf[5] & 255) << 40) + | ((uint64_t) (buf[6] & 255) << 48) + | ((uint64_t) (buf[7] & 255) << 56); + } + readBuffer += 8; + return 8; +} + +int DataStream::readLong(uint32_t &value, bool is_little_endian) { + if ((4 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + + } + readBuffer += 4; + return 4; +} + +int DataStream::readShort(uint16_t &value, bool is_little_endian) { + + if ((2 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; + + } + readBuffer += 2; + return 2; +} + +int DataStream::readData(std::vector<uint8_t> &buf,int buflen) { + if ((buflen + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + + if (buf.capacity() < buflen) + buf.resize(buflen); + + buf.insert(buf.begin(),&buffer[readBuffer],&buffer[readBuffer+buflen]); + + readBuffer += buflen; + return buflen; +} + + +int DataStream::readData(uint8_t *buf,int buflen) { + if ((buflen + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + + std::copy(&buffer[readBuffer],&buffer[readBuffer+buflen],buf); + + readBuffer += buflen; + return buflen; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Site2SitePeer.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp index fb20767..3d6166b 100644 --- a/libminifi/src/Site2SitePeer.cpp +++ b/libminifi/src/Site2SitePeer.cpp @@ -138,7 +138,7 @@ bool Site2SitePeer::Open() } // OpenSSL init - SSL_CTX *ctx = FlowController::getFlowController()->getSSLContext(); + SSL_CTX *ctx = FlowControllerFactory::getFlowController()->getSSLContext(); if (ctx) { // we have s2s secure config http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/Server.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp index f7bd3dd..65245f6 100644 --- a/libminifi/test/Server.cpp +++ b/libminifi/test/Server.cpp @@ -15,7 +15,6 @@ #include <errno.h> #include <chrono> #include <thread> -#include <iostream> // std::cout #include <fstream> // std::ifstream #include <signal.h> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index a0950e1..4e7e73f 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -18,8 +18,68 @@ #ifndef LIBMINIFI_TEST_TESTBASE_H_ #define LIBMINIFI_TEST_TESTBASE_H_ - +#include <cstdio> +#include <cstdlib> +#include "ResourceClaim.h" #include "catch.hpp" +#include "Logger.h" +#include <vector> + + +class LogTestController { +public: + LogTestController(const std::string level = "debug") { + Logger::getLogger()->setLogLevel(level); + } + + + void enableDebug() + { + Logger::getLogger()->setLogLevel("debug"); + } + + ~LogTestController() { + Logger::getLogger()->setLogLevel(LOG_LEVEL_E::info); + } +}; + +class TestController{ +public: + + + + TestController() : log("info") + { + ResourceClaim::default_directory_path = "./"; + } + + ~TestController() + { + for(auto dir : directories) + { + rmdir(dir); + } + } + + void enableDebug() { + log.enableDebug(); + } + + char *createTempDirectory(char *format) + { + char *dir = mkdtemp(format); + return dir; + } + +protected: + LogTestController log; + std::vector<char*> directories; + + +}; + + + #endif /* LIBMINIFI_TEST_TESTBASE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/unit/ProcessorTests.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.h b/libminifi/test/unit/ProcessorTests.h index 0cb6f65..0c17824 100644 --- a/libminifi/test/unit/ProcessorTests.h +++ b/libminifi/test/unit/ProcessorTests.h @@ -15,12 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <cstdlib> #include <uuid/uuid.h> #include <fstream> +#include "FlowController.h" +#include "ProvenanceTests.h" #include "../TestBase.h" #include "GetFile.h" +#ifndef PROCESSOR_TESTS +#define PROCESSOR_TESTS TEST_CASE("Test Creation of GetFile", "[getfileCreate]"){ GetFile processor("processorname"); @@ -30,24 +33,33 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]"){ TEST_CASE("Test Find file", "[getfileCreate2]"){ + TestController testController; + + testController.enableDebug(); + + ProvenanceTestRepository repo; + TestFlowController controller(repo); + FlowControllerFactory::getFlowController( dynamic_cast<FlowController*>(&controller)); GetFile processor("getfileCreate2"); char format[] ="/tmp/gt.XXXXXX"; - char *dir = mkdtemp(format); + char *dir = testController.createTempDirectory(format); + + uuid_t processoruuid; + REQUIRE( true == processor.getUUID(processoruuid) ); - Connection connection("emptyConnection"); + Connection connection("getfileCreate2Connection"); connection.setRelationship(Relationship("success","description")); // link the connections so that we can test results at the end for this connection.setSourceProcessor(&processor); - uuid_t processoruuid; - uuid_parse(processor.getUUIDStr().c_str(),processoruuid); connection.setSourceProcessorUUID(processoruuid); + connection.setDestinationProcessorUUID(processoruuid); processor.addConnection(&connection); REQUIRE( dir != NULL ); @@ -64,15 +76,15 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){ processor.onTrigger(&context,&session); ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set<ProvenanceEventRecord *> records = reporter->getEvents(); + std::set<ProvenanceEventRecord*> records = reporter->getEvents(); - record = session.get(); + record = session.get(); REQUIRE( record== 0 ); REQUIRE( records.size() == 0 ); std::fstream file; std::stringstream ss; - ss << dir << "/" << "tstFile"; + ss << dir << "/" << "tstFile.ext"; file.open(ss.str(),std::ios::out); file << "tempFile"; file.close(); @@ -89,12 +101,54 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){ for(ProvenanceEventRecord *provEventRecord : records) { - REQUIRE (provEventRecord->getComponentType() == processor.getName()); } + session.commit(); + + FlowFileRecord *ffr = session.get(); + + ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + + delete ffr; + + std::set<FlowFileRecord*> expiredFlows; + + REQUIRE( 2 == repo.getRepoMap().size() ); + + for(auto entry: repo.getRepoMap()) + { + ProvenanceEventRecord newRecord; + newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length()); + + bool found = false; + for ( auto provRec : records) + { + if (provRec->getEventId() == newRecord.getEventId() ) + { + REQUIRE( provRec->getEventId() == newRecord.getEventId()); + REQUIRE( provRec->getComponentId() == newRecord.getComponentId()); + REQUIRE( provRec->getComponentType() == newRecord.getComponentType()); + REQUIRE( provRec->getDetails() == newRecord.getDetails()); + REQUIRE( provRec->getEventDuration() == newRecord.getEventDuration()); + found = true; + break; + } + } + if (!found) + throw std::runtime_error("Did not find record"); + + + } + + } + + +#endif + + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 2516ed9..f67a826 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -19,10 +19,17 @@ #define LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ #include "Provenance.h" +#include "FlowController.h" +/** + * Test repository + */ class ProvenanceTestRepository : public ProvenanceRepository { public: + ProvenanceTestRepository() +{ +} //! initialize bool initialize() { @@ -59,9 +66,72 @@ public: return false; } } + + const std::map<std::string,std::string> &getRepoMap() const + { + return repositoryResults; + } + protected: std::map<std::string,std::string> repositoryResults; }; +class TestFlowController : public FlowController +{ + +public: + TestFlowController(ProvenanceTestRepository &repo) : ::FlowController() + { + _provenanceRepo = dynamic_cast<ProvenanceRepository*>(&repo); + } + ~TestFlowController() + { + + } + void load(){ + + } + + bool start() + { + _running.store(true); + return true; + } + + void stop(bool force) + { + _running.store(false); + } + void waitUnload(const uint64_t timeToWaitMs) + { + stop(true); + } + + void unload() + { + stop(true); + } + + void reload(std::string file) + { + + } + + bool isRunning() + { + return true; + } + + + Processor *createProcessor(std::string name, uuid_t uuid){ return 0;} + + ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid){ return 0;} + + ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid){ return 0; } + + Connection *createConnection(std::string name, uuid_t uuid){ return 0; } +}; + + #endif /* LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/test/unit/ProvenanceTests.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTests.h b/libminifi/test/unit/ProvenanceTests.h index 63608df..d78de47 100644 --- a/libminifi/test/unit/ProvenanceTests.h +++ b/libminifi/test/unit/ProvenanceTests.h @@ -16,6 +16,9 @@ * limitations under the License. */ + +#ifndef PROVENANCE_TESTS +#define PROVENANCE_TESTS #include "../TestBase.h" #include "ProvenanceTestHelper.h" @@ -23,14 +26,71 @@ #include "FlowFileRecord.h" -TEST_CASE("Test Provenance record creation", "[TestProvenanceEventRecord]"){ - - ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"blah","blahblah"); +TEST_CASE("Test Provenance record create", "[TestProvenanceEventRecord]"){ + ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"blah","blahblah"); REQUIRE( record1.getAttributes().size() == 0); REQUIRE( record1.getAlternateIdentifierUri().length() == 0); } +TEST_CASE("Test Provenance record serialization", "[TestProvenanceEventRecordSerializeDeser]"){ + + ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"componentid","componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)" ; + record1.setDetails(smileyface); + + ProvenanceTestRepository repo; + uint64_t sample = 65555; + ProvenanceRepository *testRepository = dynamic_cast<ProvenanceRepository*>(&repo); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + ProvenanceEventRecord record2; + REQUIRE( record2.DeSerialize(testRepository,eventId) == true); + REQUIRE( record2.getEventId() == record1.getEventId()); + REQUIRE( record2.getComponentId() == record1.getComponentId()); + REQUIRE( record2.getComponentType() == record1.getComponentType()); + REQUIRE( record2.getDetails() == record1.getDetails()); + REQUIRE( record2.getDetails() == smileyface); + REQUIRE( record2.getEventDuration() == sample); +} + + +TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]"){ + + ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CLONE,"componentid","componenttype"); + std::string eventId = record1.getEventId(); + std::map<std::string, std::string> attributes; + attributes.insert(std::pair<std::string,std::string>("potato","potatoe")); + attributes.insert(std::pair<std::string,std::string>("tomato","tomatoe")); + FlowFileRecord ffr1(attributes); + + record1.addChildFlowFile(&ffr1); + + ProvenanceTestRepository repo; + uint64_t sample = 65555; + ProvenanceRepository *testRepository = dynamic_cast<ProvenanceRepository*>(&repo); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + ProvenanceEventRecord record2; + REQUIRE( record2.DeSerialize(testRepository,eventId) == true); + REQUIRE( record1.getChildrenUuids().size() == 1); + REQUIRE( record2.getChildrenUuids().size() == 1); + std::string childId = record2.getChildrenUuids().at(0); + REQUIRE( childId == ffr1.getUUIDStr()); + record2.removeChildUuid(childId); + REQUIRE( record2.getChildrenUuids().size() == 0); + + +} + + + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index ea916cd..9b99ee6 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -17,7 +17,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include <fcntl.h> #include <stdio.h> +#include <semaphore.h> #include <signal.h> #include <vector> #include <queue> @@ -34,7 +36,7 @@ //! Main thread sleep interval 1 second #define SLEEP_INTERVAL 1 //! Main thread stop wait time -#define STOP_WAIT_TIME 2 +#define STOP_WAIT_TIME_MS 30*1000 //! Default YAML location #define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml" //! Default nifi properties file path @@ -43,23 +45,32 @@ #define MINIFI_HOME_ENV_KEY "MINIFI_HOME" /* Define Parser Values for Configuration YAML sections */ -#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller" #define CONFIG_YAML_PROCESSORS_KEY "Processors" +#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller" #define CONFIG_YAML_CONNECTIONS_KEY "Connections" #define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups" -//! Whether it is running -static bool running = false; +// Variables that allow us to avoid a timed wait. +sem_t *running; //! Flow Controller static FlowController *controller = NULL; +/** + * Removed the stop command from the signal handler so that we could trigger + * unload after we exit the semaphore controlled critical section in main. + * + * Semaphores are a portable choice when using signal handlers. Threads, + * mutexes, and condition variables are not guaranteed to work within + * a signal handler. Consequently we will use the semaphore to avoid thread + * safety issues and. + */ void sigHandler(int signal) { + if (signal == SIGINT || signal == SIGTERM) { - controller->stop(true); - sleep(STOP_WAIT_TIME); - running = false; + // avoid stopping the controller here. + sem_post(running); } } @@ -68,6 +79,19 @@ int main(int argc, char **argv) Logger *logger = Logger::getLogger(); logger->setLogLevel(info); + + uint16_t stop_wait_time = STOP_WAIT_TIME_MS; + + std::string graceful_shutdown_seconds = ""; + std::string configured_log_level = ""; + + running = sem_open("MiNiFiMain",O_CREAT,0644,0); + if (running == SEM_FAILED || running == 0) + { + + logger->log_error("could not initialize semaphore"); + perror("initialization failure"); + } // assumes POSIX compliant environment std::string minifiHome; if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY)) @@ -85,6 +109,7 @@ int main(int argc, char **argv) minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\")); //Remove /bin from path } + if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR) { logger->log_error("Can not install signal handler"); @@ -95,25 +120,66 @@ int main(int argc, char **argv) configure->setHome(minifiHome); configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); - controller = FlowController::getFlowController(); + + if (configure->get(Configure::nifi_graceful_shutdown_seconds,graceful_shutdown_seconds)) + { + try + { + stop_wait_time = std::stoi(graceful_shutdown_seconds); + } + catch(const std::out_of_range &e) + { + logger->log_error("%s is out of range. %s",Configure::nifi_graceful_shutdown_seconds,e.what()); + } + catch(const std::invalid_argument &e) + { + logger->log_error("%s contains an invalid argument set. %s",Configure::nifi_graceful_shutdown_seconds,e.what()); + } + } + else + { + logger->log_debug("%s not set, defaulting to %d",Configure::nifi_graceful_shutdown_seconds,STOP_WAIT_TIME_MS); + } + + if (configure->get(Configure::nifi_log_level,configured_log_level)) + { + std::cout << "log level is " << configured_log_level << std::endl; + logger->setLogLevel(configured_log_level); + + } + + + + controller = FlowControllerFactory::getFlowController(); // Load flow from specified configuration file controller->load(); // Start Processing the flow - controller->start(); - running = true; + controller->start(); logger->log_info("MiNiFi started"); - // main loop - while (running) - { - sleep(SLEEP_INTERVAL); - } + /** + * Sem wait provides us the ability to have a controlled + * yield without the need for a more complex construct and + * a spin lock + */ + if ( sem_wait(running) != -1 ) + perror("sem_wait"); + + + sem_unlink("MiNiFiMain"); + + /** + * Trigger unload -- wait stop_wait_time + */ + controller->waitUnload(stop_wait_time); - controller->unload(); delete controller; + logger->log_info("MiNiFi exit"); + + return 0; }
