MINIFI-190: Creating initial commit with changes Resolves MINIFI-194 by using a semaphore in place of the FlowController instance. Stop is performed outside of the signal handler to avoid synchronicity issues.
Resolves MINIFI-192 by using lock_guard based on a conditional Resolves issues found with MINIFI-190 regarding GetFile. Added pragma definitions for GCC < 4.9 This closes #47. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/09d973ba Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/09d973ba Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/09d973ba Branch: refs/heads/master Commit: 09d973bafb7d798a1348e3527f3484ca9fca26be Parents: 63dbb82 Author: Marc <[email protected]> Authored: Thu Feb 9 20:40:32 2017 -0500 Committer: Aldrin Piri <[email protected]> Committed: Sun Feb 12 15:47:34 2017 -0500 ---------------------------------------------------------------------- .travis.yml | 2 +- CMakeLists.txt | 2 - libminifi/include/Configure.h | 2 + libminifi/include/FlowController.h | 238 +++-- libminifi/include/Logger.h | 32 + libminifi/include/Processor.h | 2 +- libminifi/include/Provenance.h | 219 +---- libminifi/include/ResourceClaim.h | 6 +- libminifi/include/Serializable.h | 294 ++++++ libminifi/src/Configure.cpp | 2 + libminifi/src/FlowControlProtocol.cpp | 21 - libminifi/src/FlowController.cpp | 1103 ++++++++++++----------- libminifi/src/FlowFileRecord.cpp | 2 +- libminifi/src/GetFile.cpp | 55 +- libminifi/src/ProcessGroup.cpp | 14 +- libminifi/src/ProcessSession.cpp | 57 +- libminifi/src/Processor.cpp | 108 +-- libminifi/src/Provenance.cpp | 822 ++++++----------- libminifi/src/PutFile.cpp | 2 + libminifi/src/ResourceClaim.cpp | 4 + libminifi/src/Serializable.cpp | 365 ++++++++ libminifi/src/Site2SitePeer.cpp | 2 +- libminifi/test/Server.cpp | 1 - libminifi/test/TestBase.h | 62 +- libminifi/test/unit/ProcessorTests.h | 72 +- libminifi/test/unit/ProvenanceTestHelper.h | 70 ++ libminifi/test/unit/ProvenanceTests.h | 66 +- main/MiNiFiMain.cpp | 98 +- 28 files changed, 2230 insertions(+), 1493 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 91ef329..b64902f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,7 +22,7 @@ script: # Establish updated toolchain as default - sudo unlink /usr/bin/gcc && sudo ln -s /usr/bin/gcc-4.8 /usr/bin/gcc - sudo unlink /usr/bin/g++ && sudo ln -s /usr/bin/g++-4.8 /usr/bin/g++ - - mkdir ./build && cd ./build && cmake .. && make && make test + - mkdir ./build && cd ./build && cmake .. && make && ./tests addons: apt: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 9900e2f..f19431e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,10 +60,8 @@ endif() list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") find_package(UUID REQUIRED) - file(GLOB SPD_SOURCES "include/spdlog/*") - add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3) add_subdirectory(libminifi) add_subdirectory(main) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h index 00b7742..f4aae3c 100644 --- a/libminifi/include/Configure.h +++ b/libminifi/include/Configure.h @@ -44,6 +44,8 @@ public: static const char *nifi_flow_configuration_file; static const char *nifi_administrative_yield_duration; static const char *nifi_bored_yield_duration; + static const char *nifi_graceful_shutdown_seconds; + static const char *nifi_log_level; static const char *nifi_server_name; static const char *nifi_server_port; static const char *nifi_server_report_interval; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index b98393e..02c39b1 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -28,7 +28,7 @@ #include <atomic> #include <algorithm> #include <set> -#include <yaml-cpp/yaml.h> +#include "yaml-cpp/yaml.h" #include "Configure.h" #include "Property.h" @@ -75,21 +75,15 @@ struct ProcessorConfig { std::vector<Property> properties; }; -//! FlowController Class -class FlowController -{ +/** + * Flow Controller class. Generally used by FlowController factory + * as a singleton. + */ +class FlowController { public: - static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; - static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; - //! Get the singleton flow controller - static FlowController * getFlowController() - { - if (!_flowController) - { - _flowController = new FlowController(); - } - return _flowController; - } + static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; + static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; + //! passphase for the private file callback static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { @@ -111,96 +105,104 @@ public: } //! Destructor - virtual ~FlowController(); + virtual ~FlowController(){ + if (_ctx) + SSL_CTX_free(_ctx); + } //! Set FlowController Name - void setName(std::string name) { + virtual void setName(std::string name) { _name = name; } //! Get Flow Controller Name - std::string getName(void) { + virtual std::string getName(void) { return (_name); } //! Set UUID - void setUUID(uuid_t uuid) { + virtual void setUUID(uuid_t uuid) { uuid_copy(_uuid, uuid); } //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { + virtual bool getUUID(uuid_t uuid) { + if (uuid) { uuid_copy(uuid, _uuid); return true; - } - else + } else return false; } //! Set MAX TimerDrivenThreads - void setMaxTimerDrivenThreads(int number) - { + virtual void setMaxTimerDrivenThreads(int number) { _maxTimerDrivenThreads = number; } //! Get MAX TimerDrivenThreads - int getMaxTimerDrivenThreads() - { + virtual int getMaxTimerDrivenThreads() { return _maxTimerDrivenThreads; } //! Set MAX EventDrivenThreads - void setMaxEventDrivenThreads(int number) - { + virtual void setMaxEventDrivenThreads(int number) { _maxEventDrivenThreads = number; } //! Get MAX EventDrivenThreads - int getMaxEventDrivenThreads() - { + virtual int getMaxEventDrivenThreads() { return _maxEventDrivenThreads; } //! Get the provenance repository - ProvenanceRepository *getProvenanceRepository() - { + virtual ProvenanceRepository *getProvenanceRepository() { return this->_provenanceRepo; } - //! Life Cycle related function - //! Load flow YAML from disk, after that, create the root process group and its children, initialize the flows - void load(); + //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows + virtual void load() = 0; + //! Whether the Flow Controller is start running - bool isRunning(); - //! Whether the Flow Controller has already been initialized (loaded flow YAML) - bool isInitialized(); + virtual bool isRunning() { + return _running.load(); + } + //! Whether the Flow Controller has already been initialized (loaded flow XML) + virtual bool isInitialized() { + return _initialized.load(); + } //! Start to run the Flow Controller which internally start the root process group and all its children - bool start(); - //! Stop to run the Flow Controller which internally stop the root process group and all its children - void stop(bool force); - //! reload flow controller's configuration - void reload(std::string yamlFile); + virtual bool start() = 0; //! Unload the current flow YAML, clean the root process group and all its children - void unload(); + virtual void stop(bool force) = 0; + //! Asynchronous function trigger unloading and wait for a period of time + virtual void waitUnload(const uint64_t timeToWaitMs) = 0; + //! Unload the current flow xml, clean the root process group and all its children + virtual void unload() = 0; + //! Load new xml + virtual void reload(std::string yamlFile) = 0; //! update property value - void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) - { + void updatePropertyValue(std::string processorName, + std::string propertyName, std::string propertyValue) { if (_root) - _root->updatePropertyValue(processorName, propertyName, propertyValue); + _root->updatePropertyValue(processorName, propertyName, + propertyValue); } //! Create Processor (Node/Input/Output Port) based on the name - Processor *createProcessor(std::string name, uuid_t uuid); + virtual Processor *createProcessor(std::string name, uuid_t uuid) = 0; //! Create Root Processor Group - ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid); + virtual ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid) = 0; //! Create Remote Processor Group - ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); + virtual ProcessGroup *createRemoteProcessGroup(std::string name, + uuid_t uuid) = 0; //! Create Connection - Connection *createConnection(std::string name, uuid_t uuid); + virtual Connection *createConnection(std::string name, uuid_t uuid) = 0; //! set 8 bytes SerialNumber - void setSerialNumber(uint8_t *number) - { + virtual void setSerialNumber(uint8_t *number) { _protocol->setSerialNumber(number); } + + //! getSSLContext - SSL_CTX *getSSLContext() + virtual SSL_CTX *getSSLContext() { return _ctx; } protected: + + //! SSL context + SSL_CTX *_ctx; //! A global unique identifier uuid_t _uuid; @@ -218,6 +220,10 @@ protected: int _maxEventDrivenThreads; //! Config //! FlowFile Repo + //! Whether it is running + std::atomic<bool> _running; + //! Whether it has already been initialized (load the flow XML already) + std::atomic<bool> _initialized; //! Provenance Repo ProvenanceRepository *_provenanceRepo; //! Flow Engines @@ -231,50 +237,130 @@ protected: //! Heart Beat //! FlowControl Protocol FlowControlProtocol *_protocol; - //! SSL context - SSL_CTX *_ctx; + + + FlowController() : + _root(0), _maxTimerDrivenThreads(0), _maxEventDrivenThreads(0), _running( + false), _initialized(false), _provenanceRepo(0), _protocol( + 0), _logger(Logger::getLogger()), _ctx(NULL){ + } private: + //! Logger + Logger *_logger; + +}; + +/** + * Flow Controller implementation that defines the typical flow. + * of events. + */ +class FlowControllerImpl: public FlowController { +public: + + //! Destructor + virtual ~FlowControllerImpl(); + + //! Life Cycle related function + //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows + void load(); + //! Start to run the Flow Controller which internally start the root process group and all its children + bool start(); + //! Stop to run the Flow Controller which internally stop the root process group and all its children + void stop(bool force); + //! Asynchronous function trigger unloading and wait for a period of time + void waitUnload(const uint64_t timeToWaitMs); + //! Unload the current flow xml, clean the root process group and all its children + void unload(); + //! Load new xml + void reload(std::string yamlFile); + //! update property value + void updatePropertyValue(std::string processorName, + std::string propertyName, std::string propertyValue) { + if (_root) + _root->updatePropertyValue(processorName, propertyName, + propertyValue); + } + + //! Create Processor (Node/Input/Output Port) based on the name + Processor *createProcessor(std::string name, uuid_t uuid); + //! Create Root Processor Group + ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid); + //! Create Remote Processor Group + ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); + //! Create Connection + Connection *createConnection(std::string name, uuid_t uuid); + + //! Constructor + /*! + * Create a new Flow Controller + */ + FlowControllerImpl(std::string name = DEFAULT_ROOT_GROUP_NAME); + + + + + friend class FlowControlFactory; + +private: + + + //! Mutex for protection std::mutex _mtx; //! Logger Logger *_logger; - //! Configure Configure *_configure; - //! Whether it is running - std::atomic<bool> _running; - //! Whether it has already been initialized (load the flow YAML already) - std::atomic<bool> _initialized; //! Process Processor Node YAML void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent); //! Process Port YAML - void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction); + void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, + TransferDirection direction); //! Process Root Processor Group YAML void parseRootProcessGroupYaml(YAML::Node rootNode); //! Process Property YAML - void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, Processor *processor); + void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, + Processor *processor); //! Process connection YAML void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent); //! Process Remote Process Group YAML void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup *parent); //! Parse Properties Node YAML for a processor - void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor); - - static FlowController *_flowController; - - //! Constructor - /*! - * Create a new Flow Controller - */ - FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME); + void parsePropertiesNodeYaml(YAML::Node *propertiesNode, + Processor *processor); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer - FlowController(const FlowController &parent); - FlowController &operator=(const FlowController &parent); + FlowControllerImpl(const FlowController &parent); + FlowControllerImpl &operator=(const FlowController &parent); + +}; +/** + * Flow Controller factory that creates flow controllers or gets the + * assigned instance. + */ +class FlowControllerFactory { +public: + //! Get the singleton flow controller + static FlowController * getFlowController(FlowController *instance = 0) { + if (!_flowController) { + if (NULL == instance) + _flowController = createFlowController(); + else + _flowController = instance; + } + return _flowController; + } + + //! Get the singleton flow controller + static FlowController * createFlowController() { + return dynamic_cast<FlowController*>(new FlowControllerImpl()); + } +private: + static FlowController *_flowController; }; #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Logger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Logger.h b/libminifi/include/Logger.h index 621df78..38d6a97 100644 --- a/libminifi/include/Logger.h +++ b/libminifi/include/Logger.h @@ -21,6 +21,8 @@ #ifndef __LOGGER_H__ #define __LOGGER_H__ +#include <string> +#include <algorithm> #include <cstdio> #include "spdlog/spdlog.h" @@ -72,6 +74,36 @@ public: return; _spdlog->set_level((spdlog::level::level_enum) level); } + + void setLogLevel(const std::string &level,LOG_LEVEL_E defaultLevel = info ) + { + std::string logLevel = ""; + std::transform(level.begin(), level.end(), logLevel.end(), ::tolower); + + if (logLevel == "trace") { + setLogLevel(trace); + } else if (logLevel == "debug") { + setLogLevel(debug); + } else if (logLevel == "info") { + setLogLevel(info); + } else if (logLevel == "notice") { + setLogLevel(notice); + } else if (logLevel == "warn") { + setLogLevel(warn); + } else if (logLevel == "error") { + setLogLevel(err); + } else if (logLevel == "critical") { + setLogLevel(critical); + } else if (logLevel == "alert") { + setLogLevel(alert); + } else if (logLevel == "emerg") { + setLogLevel(emerg); + } else if (logLevel == "off") { + setLogLevel(off); + } else { + setLogLevel(defaultLevel); + } + } //! Destructor ~Logger() {} /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h index 35bf040..a0df577 100644 --- a/libminifi/include/Processor.h +++ b/libminifi/include/Processor.h @@ -343,7 +343,7 @@ private: //! Incoming connection Iterator std::set<Connection *>::iterator _incomingConnectionsIter; //! Condition for whether there is incoming work to do - bool _hasWork = false; + std::atomic<bool> _hasWork; //! Concurrent condition mutex for whether there is incoming work to do std::mutex _workAvailableMtx; //! Concurrent condition variable for whether there is incoming work to do http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h index f3e814f..7507c03 100644 --- a/libminifi/include/Provenance.h +++ b/libminifi/include/Provenance.h @@ -20,33 +20,30 @@ #ifndef __PROVENANCE_H__ #define __PROVENANCE_H__ -#include <stdio.h> -#include <errno.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/stat.h> +#include <ftw.h> #include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> #include <atomic> +#include <cstdint> +#include <cstring> +#include <iostream> +#include <map> #include <set> -#include <cassert> -#include <errno.h> -#include <chrono> +#include <string> #include <thread> -#include <ftw.h> -#include "leveldb/db.h" +#include <vector> -#include "TimeUtil.h" -#include "Logger.h" +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" #include "Configure.h" -#include "Property.h" -#include "ResourceClaim.h" -#include "Relationship.h" #include "Connection.h" #include "FlowFileRecord.h" +#include "Logger.h" +#include "Property.h" +#include "ResourceClaim.h" +#include "TimeUtil.h" +#include "Serializable.h" // Provenance Event Record Serialization Seg Size #define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048 @@ -54,7 +51,7 @@ class ProvenanceRepository; //! Provenance Event Record -class ProvenanceEventRecord +class ProvenanceEventRecord : protected Serializable { public: enum ProvenanceEventType { @@ -176,17 +173,11 @@ public: uuid_generate(_eventId); uuid_unparse(_eventId, eventIdStr); _eventIdStr = eventIdStr; - _serializedBuf = NULL; - _serializeBufSize = 0; - _maxSerializeBufSize = 0; _logger = Logger::getLogger(); } ProvenanceEventRecord() { _eventTime = getTimeMillis(); - _serializedBuf = NULL; - _serializeBufSize = 0; - _maxSerializeBufSize = 0; _logger = Logger::getLogger(); } @@ -399,7 +390,12 @@ public: //! Serialize and Persistent to the repository bool Serialize(ProvenanceRepository *repo); //! DeSerialize - bool DeSerialize(uint8_t *buffer, int bufferSize); + bool DeSerialize(const uint8_t *buffer, const int bufferSize); + //! DeSerialize + bool DeSerialize(DataStream &stream) + { + return DeSerialize(stream.getBuffer(),stream.getSize()); + } //! DeSerialize bool DeSerialize(ProvenanceRepository *repo, std::string key); @@ -456,151 +452,7 @@ private: //! Logger Logger *_logger; - // All serialization related method and internal buf - uint8_t *_serializedBuf; - int _serializeBufSize; - int _maxSerializeBufSize; - int writeData(uint8_t *value, int size) - { - if ((_serializeBufSize + size) > _maxSerializeBufSize) - { - // if write exceed - uint8_t *buffer = new uint8_t[_maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE]; - if (!buffer) - { - return -1; - } - memcpy(buffer, _serializedBuf, _serializeBufSize); - delete[] _serializedBuf; - _serializedBuf = buffer; - _maxSerializeBufSize = _maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE; - } - uint8_t *bufPtr = _serializedBuf + _serializeBufSize; - memcpy(bufPtr, value, size); - _serializeBufSize += size; - return size; - } - int readData(uint8_t *buf, int buflen) - { - if ((buflen + _serializeBufSize) > _maxSerializeBufSize) - { - // if read exceed - return -1; - } - uint8_t *bufPtr = _serializedBuf + _serializeBufSize; - memcpy(buf, bufPtr, buflen); - _serializeBufSize += buflen; - return buflen; - } - int write(uint8_t value) - { - return writeData(&value, 1); - } - int write(char value) - { - return writeData((uint8_t *)&value, 1); - } - int write(uint32_t value) - { - uint8_t temp[4]; - - temp[0] = (value & 0xFF000000) >> 24; - temp[1] = (value & 0x00FF0000) >> 16; - temp[2] = (value & 0x0000FF00) >> 8; - temp[3] = (value & 0x000000FF); - return writeData(temp, 4); - } - int write(uint16_t value) - { - uint8_t temp[2]; - temp[0] = (value & 0xFF00) >> 8; - temp[1] = (value & 0xFF); - return writeData(temp, 2); - } - int write(uint8_t *value, int len) - { - return writeData(value, len); - } - int write(uint64_t value) - { - uint8_t temp[8]; - - temp[0] = (value >> 56) & 0xFF; - temp[1] = (value >> 48) & 0xFF; - temp[2] = (value >> 40) & 0xFF; - temp[3] = (value >> 32) & 0xFF; - temp[4] = (value >> 24) & 0xFF; - temp[5] = (value >> 16) & 0xFF; - temp[6] = (value >> 8) & 0xFF; - temp[7] = (value >> 0) & 0xFF; - return writeData(temp, 8); - } - int write(bool value) - { - uint8_t temp = value; - return write(temp); - } - int writeUTF(std::string str, bool widen = false); - int read(uint8_t &value) - { - uint8_t buf; - - int ret = readData(&buf, 1); - if (ret == 1) - value = buf; - return ret; - } - int read(uint16_t &value) - { - uint8_t buf[2]; - - int ret = readData(buf, 2); - if (ret == 2) - value = (buf[0] << 8) | buf[1]; - return ret; - } - int read(char &value) - { - uint8_t buf; - - int ret = readData(&buf, 1); - if (ret == 1) - value = (char) buf; - return ret; - } - int read(uint8_t *value, int len) - { - return readData(value, len); - } - int read(uint32_t &value) - { - uint8_t buf[4]; - - int ret = readData(buf, 4); - if (ret == 4) - value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; - return ret; - } - int read(uint64_t &value) - { - uint8_t buf[8]; - - int ret = readData(buf, 8); - if (ret == 8) - { - value = ((uint64_t) buf[0] << 56) | - ((uint64_t) (buf[1] & 255) << 48) | - ((uint64_t) (buf[2] & 255) << 40) | - ((uint64_t) (buf[3] & 255) << 32) | - ((uint64_t) (buf[4] & 255) << 24) | - ((uint64_t) (buf[5] & 255) << 16) | - ((uint64_t) (buf[6] & 255) << 8) | - ((uint64_t) (buf[7] & 255) << 0); - } - return ret; - } - int readUTF(std::string &str, bool widen = false); - + // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer ProvenanceEventRecord(const ProvenanceEventRecord &parent); @@ -649,10 +501,9 @@ public: //! clear void clear() { - for (std::set<ProvenanceEventRecord*>::iterator it = _events.begin(); it != _events.end(); ++it) + for (auto it : _events) { - ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it); - delete event; + delete it; } _events.clear(); } @@ -733,6 +584,7 @@ public: _purgePeriod = PROVENANCE_PURGE_PERIOD; _maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE; _db = NULL; + _thread = NULL; _running = false; _repoFull = false; } @@ -746,7 +598,7 @@ public: } //! initialize - bool initialize() + virtual bool initialize() { std::string value; if (_configure->get(Configure::nifi_provenance_repository_directory_default, value)) @@ -786,7 +638,7 @@ public: return true; } //! Put - bool Put(std::string key, uint8_t *buf, int bufLen) + virtual bool Put(std::string key, uint8_t *buf, int bufLen) { // persistent to the DB leveldb::Slice value((const char *) buf, bufLen); @@ -798,7 +650,7 @@ public: return false; } //! Delete - bool Delete(std::string key) + virtual bool Delete(std::string key) { leveldb::Status status; status = _db->Delete(leveldb::WriteOptions(), key); @@ -808,7 +660,7 @@ public: return false; } //! Get - bool Get(std::string key, std::string &value) + virtual bool Get(std::string key, std::string &value) { leveldb::Status status; status = _db->Get(leveldb::ReadOptions(), key, &value); @@ -839,11 +691,11 @@ public: //! Run function for the thread static void run(ProvenanceRepository *repo); //! Start the repository monitor thread - void start(); + virtual void start(); //! Stop the repository monitor thread - void stop(); + virtual void stop(); //! whether the repo is full - bool isFull() + virtual bool isFull() { return _repoFull; } @@ -859,8 +711,8 @@ private: //! Logger Logger *_logger; //! Configure - Configure *_configure; //! max db entry life time + Configure *_configure; int64_t _maxPartitionMillis; //! max db size int64_t _maxPartitionBytes; @@ -899,4 +751,5 @@ private: + #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/ResourceClaim.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h index 098b259..b4085f2 100644 --- a/libminifi/include/ResourceClaim.h +++ b/libminifi/include/ResourceClaim.h @@ -32,15 +32,19 @@ //! Default content directory #define DEFAULT_CONTENT_DIRECTORY "./content_repository" + + //! ResourceClaim Class class ResourceClaim { public: + + static std::string default_directory_path; //! Constructor /*! * Create a new resource claim */ - ResourceClaim(const std::string contentDirectory); + ResourceClaim(const std::string contentDirectory = default_directory_path); //! Destructor virtual ~ResourceClaim() {} //! increaseFlowFileRecordOwnedCount http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/include/Serializable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Serializable.h b/libminifi/include/Serializable.h new file mode 100644 index 0000000..c32dee0 --- /dev/null +++ b/libminifi/include/Serializable.h @@ -0,0 +1,294 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __SERIALIZABLE_H__ +#define __SERIALIZABLE_H__ + +#include <cstdint> +#include <vector> +#include <string> + + +/** + * Mechanism to determine endianness of host. + * Accounts for only BIG/LITTLE/BIENDIAN + **/ +class EndiannessCheck +{ +public: + static bool IS_LITTLE; +private: + + static bool is_little_endian() { + /* do whatever is needed at static init time */ + unsigned int x = 1; + char *c = (char*) &x; + IS_LITTLE=*c==1; + return IS_LITTLE; + } +}; + + +/** + * DataStream defines the mechanism through which + * binary data will be written to a sink + */ +class DataStream +{ +public: + + DataStream() : readBuffer(0) + { + + } + + /** + * Constructor + **/ + DataStream(const uint8_t *buf, const uint32_t buflen) : DataStream() + { + writeData((uint8_t*)buf,buflen); + + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + int readData(std::vector<uint8_t> &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + int readData(uint8_t *buf, int buflen); + + /** + * writes valiue to buffer + * @param value value to write + * @param size size of value + */ + int writeData(uint8_t *value, int size); + + + /** + * Reads a system word + * @param value value to write + */ + inline int readLongLong(uint64_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + + /** + * Reads a uint32_t + * @param value value to write + */ + inline int readLong(uint32_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + + /** + * Reads a system short + * @param value value to write + */ + inline int readShort(uint16_t &value,bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const + { + return &buffer[0]; + } + + /** + * Retrieve size of data stream + * @return size of data stream + **/ + const uint32_t getSize() const + { + return buffer.size(); + } + +private: + // All serialization related method and internal buf + std::vector<uint8_t> buffer; + uint32_t readBuffer; +}; + +/** + * Serializable instances provide base functionality to + * write certain objects/primitives to a data stream. + * + */ +class Serializable { + +public: + + /** + * Inline function to write T to stream + **/ + template<typename T> + inline int writeData(const T &t,DataStream *stream); + + /** + * Inline function to write T to to_vec + **/ + template<typename T> + inline int writeData(const T &t, uint8_t *to_vec); + + /** + * Inline function to write T to to_vec + **/ + template<typename T> + inline int writeData(const T &t, std::vector<uint8_t> &to_vec); + + + /** + * write byte to stream + * @return resulting write size + **/ + int write(uint8_t value,DataStream *stream); + + /** + * write byte to stream + * @return resulting write size + **/ + int write(char value,DataStream *stream); + + /** + * write 4 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint32_t base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write 2 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint16_t base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write valueto stream + * @param value non encoded value + * @param len length of value + * @param strema output stream + * @return resulting write size + **/ + int write(uint8_t *value, int len,DataStream *stream); + + /** + * write 8 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint64_t base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write bool to stream + * @param value non encoded value + * @return resulting write size + **/ + int write(bool value); + + /** + * write UTF string to stream + * @param str string to write + * @return resulting write size + **/ + int writeUTF(std::string str,DataStream *stream, bool widen = false); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint8_t &value,DataStream *stream); + + /** + * reads two bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint16_t &base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(char &value,DataStream *stream); + + /** + * reads a byte array from the stream + * @param value reference in which will set the result + * @param len length to read + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint8_t *value, int len,DataStream *stream); + + /** + * reads four bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint32_t &value,DataStream *stream, + bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * reads eight byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint64_t &value,DataStream *stream, + bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * read UTF from stream + * @param str reference string + * @param stream stream from which we will read + * @return resulting read size + **/ + int readUTF(std::string &str,DataStream *stream, bool widen = false); + +protected: + +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 2652e35..61e782b 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -23,6 +23,8 @@ Configure *Configure::_configure(NULL); const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; +const char *Configure::nifi_graceful_shutdown_seconds = "nifi.graceful.shutdown.seconds"; +const char *Configure::nifi_log_level = "nifi.log.level"; const char *Configure::nifi_server_name = "nifi.server.name"; const char *Configure::nifi_server_port = "nifi.server.port"; const char *Configure::nifi_server_report_interval= "nifi.server.report.interval"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index fee1a3b..a74e32e 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -346,27 +346,6 @@ int FlowControlProtocol::sendRegisterReq() _logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); this->_reportInterval = reportInterval; } - else if (((FlowControlMsgID) msgID) == FLOW_YML_CONTENT) - { - uint32_t yamlLen; - payloadPtr = this->decode(payloadPtr, yamlLen); - _logger->log_info("Flow Control Protocol receive YAML content length %d", yamlLen); - time_t rawtime; - struct tm *timeinfo; - time(&rawtime); - timeinfo = localtime(&rawtime); - std::string yamlFileName = "flow."; - yamlFileName += asctime(timeinfo); - yamlFileName += ".yml"; - std::ofstream fs; - fs.open(yamlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - if (fs.is_open()) - { - fs.write((const char *)payloadPtr, yamlLen); - fs.close(); - this->_controller->reload(yamlFileName.c_str()); - } - } else { break; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 4bbc234..b6dc4e9 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -28,60 +28,52 @@ #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> - +#include <future> #include "FlowController.h" #include "ProcessContext.h" -FlowController *FlowController::_flowController(NULL); -FlowController::FlowController(std::string name) -: _name(name) -{ - uuid_generate(_uuid); +FlowController *FlowControllerFactory::_flowController(NULL); - // Setup the default values - _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME; - _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; - _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; - _running = false; - _initialized = false; - _root = NULL; - _logger = Logger::getLogger(); - _protocol = new FlowControlProtocol(this); +FlowControllerImpl::FlowControllerImpl(std::string name) { + uuid_generate(_uuid); - // NiFi config properties - _configure = Configure::getConfigure(); + _name = name; + // Setup the default values + _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME; + _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; + _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; + _running = false; + _initialized = false; + _root = NULL; + _logger = Logger::getLogger(); + _protocol = new FlowControlProtocol(this); - std::string rawConfigFileString; - _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString); + // NiFi config properties + _configure = Configure::getConfigure(); - if (!rawConfigFileString.empty()) - { - _configurationFileName = rawConfigFileString; - } + std::string rawConfigFileString; + _configure->get(Configure::nifi_flow_configuration_file, + rawConfigFileString); - char *path = NULL; - char full_path[PATH_MAX]; + if (!rawConfigFileString.empty()) { + _configurationFileName = rawConfigFileString; + } - std::string adjustedFilename; - if (!_configurationFileName.empty()) - { - // perform a naive determination if this is a relative path - if (_configurationFileName.c_str()[0] != '/') - { - adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName; + char *path = NULL; + char full_path[PATH_MAX]; + + std::string adjustedFilename; + if (!_configurationFileName.empty()) { + // perform a naive determination if this is a relative path + if (_configurationFileName.c_str()[0] != '/') { + adjustedFilename = adjustedFilename + _configure->getHome() + "/" + + _configurationFileName; + } else { + adjustedFilename = _configurationFileName; } - else - { - adjustedFilename = _configurationFileName; - } - } + } - path = realpath(adjustedFilename.c_str(), full_path); - if (!path) - { - _logger->log_error("Could not locate path from provided configuration file name (%s). Exiting.", full_path); - exit(1); - } + path = realpath(adjustedFilename.c_str(), full_path); std::string pathString(path); _configurationFileName = pathString; @@ -90,14 +82,14 @@ FlowController::FlowController(std::string name) // Create the content repo directory if needed struct stat contentDirStat; - if (stat(DEFAULT_CONTENT_DIRECTORY, &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) + if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) { - path = realpath(DEFAULT_CONTENT_DIRECTORY, full_path); + path = realpath(ResourceClaim::default_directory_path.c_str(), full_path); _logger->log_info("FlowController content directory %s", full_path); } else { - if (mkdir(DEFAULT_CONTENT_DIRECTORY, 0777) == -1) + if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) { _logger->log_error("FlowController content directory creation failed"); exit(1); @@ -184,529 +176,614 @@ FlowController::FlowController(std::string name) _logger->log_info("Load/Verify Client Certificate OK."); } + + } + if (!path) { + _logger->log_error( + "Could not locate path from provided configuration file name (%s). Exiting.", + full_path); + exit(1); } - // Create repos for flow record and provenance - _provenanceRepo = new ProvenanceRepository(); - _provenanceRepo->initialize(); - _logger->log_info("FlowController %s created", _name.c_str()); + // Create repos for flow record and provenance + _provenanceRepo = new ProvenanceRepository(); + _provenanceRepo->initialize(); } -FlowController::~FlowController() -{ - stop(true); - unload(); - delete _protocol; - delete _provenanceRepo; - if (_ctx) - SSL_CTX_free(_ctx); -} +FlowControllerImpl::~FlowControllerImpl() { -bool FlowController::isRunning() -{ - return (_running); + stop(true); + unload(); + if (NULL != _protocol) + delete _protocol; + if (NULL != _provenanceRepo) + delete _provenanceRepo; + } -bool FlowController::isInitialized() -{ - return (_initialized); -} +void FlowControllerImpl::stop(bool force) { -void FlowController::stop(bool force) -{ - if (_running) - { - _logger->log_info("Stop Flow Controller"); - this->_timerScheduler.stop(); - this->_eventScheduler.stop(); - // Wait for sometime for thread stop - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (this->_root) - this->_root->stopProcessing( - &this->_timerScheduler, - &this->_eventScheduler); - _running = false; - } -} + if (_running) { + // immediately indicate that we are not running + _running = false; -void FlowController::unload() -{ - if (_running) - { - stop(true); - } - if (_initialized) - { - _logger->log_info("Unload Flow Controller"); - if (_root) - delete _root; - _root = NULL; - _initialized = false; - _name = ""; - } + _logger->log_info("Stop Flow Controller"); + this->_timerScheduler.stop(); + this->_eventScheduler.stop(); + // Wait for sometime for thread stop + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if (this->_root) + this->_root->stopProcessing(&this->_timerScheduler, + &this->_eventScheduler); - return; + } } +/** + * This function will attempt to unload yaml and stop running Processors. + * + * If the latter attempt fails or does not complete within the prescribed + * period, _running will be set to false and we will return. + * + * @param timeToWaitMs Maximum time to wait before manually + * marking running as false. + */ +void FlowControllerImpl::waitUnload(const uint64_t timeToWaitMs) { + if (_running) { + // use the current time and increment with the provided argument. + std::chrono::system_clock::time_point wait_time = + std::chrono::system_clock::now() + + std::chrono::milliseconds(timeToWaitMs); + + // create an asynchronous future. + std::future<void> unload_task = std::async(std::launch::async, + [this]() {unload();}); + + if (std::future_status::ready == unload_task.wait_until(wait_time)) { + _running = false; + } -Processor *FlowController::createProcessor(std::string name, uuid_t uuid) -{ - Processor *processor = NULL; - if (name == GenerateFlowFile::ProcessorName) - { - processor = new GenerateFlowFile(name, uuid); - } - else if (name == LogAttribute::ProcessorName) - { - processor = new LogAttribute(name, uuid); - } - else if (name == RealTimeDataCollector::ProcessorName) - { - processor = new RealTimeDataCollector(name, uuid); - } - else if (name == GetFile::ProcessorName) - { - processor = new GetFile(name, uuid); - } - else if (name == PutFile::ProcessorName) - { - processor = new PutFile(name, uuid); } - else if (name == TailFile::ProcessorName) - { - processor = new TailFile(name, uuid); - } - else if (name == ListenSyslog::ProcessorName) - { - processor = new ListenSyslog(name, uuid); +} + + +void FlowControllerImpl::unload() { + if (_running) { + stop(true); } - else if (name == ExecuteProcess::ProcessorName) - { - processor = new ExecuteProcess(name, uuid); + if (_initialized) { + _logger->log_info("Unload Flow Controller"); + if (_root) + delete _root; + _root = NULL; + _initialized = false; + _name = ""; } - else if (name == AppendHostInfo::ProcessorName) - { - processor = new AppendHostInfo(name, uuid); + + return; +} + +Processor *FlowControllerImpl::createProcessor(std::string name, uuid_t uuid) { + Processor *processor = NULL; + if (name == GenerateFlowFile::ProcessorName) { + processor = new GenerateFlowFile(name, uuid); + } else if (name == LogAttribute::ProcessorName) { + processor = new LogAttribute(name, uuid); + } else if (name == RealTimeDataCollector::ProcessorName) { + processor = new RealTimeDataCollector(name, uuid); + } else if (name == GetFile::ProcessorName) { + processor = new GetFile(name, uuid); + } else if (name == PutFile::ProcessorName) { + processor = new PutFile(name, uuid); + } else if (name == TailFile::ProcessorName) { + processor = new TailFile(name, uuid); + } else if (name == ListenSyslog::ProcessorName) { + processor = new ListenSyslog(name, uuid); + } else if (name == ExecuteProcess::ProcessorName) { + processor = new ExecuteProcess(name, uuid); + } else if (name == AppendHostInfo::ProcessorName) { + processor = new AppendHostInfo(name, uuid); + } else { + _logger->log_error("No Processor defined for %s", name.c_str()); + return NULL; } - else - { - _logger->log_error("No Processor defined for %s", name.c_str()); - return NULL; - } - //! initialize the processor - processor->initialize(); + //! initialize the processor + processor->initialize(); - return processor; + return processor; } -ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t uuid) -{ - return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid); +ProcessGroup *FlowControllerImpl::createRootProcessGroup(std::string name, + uuid_t uuid) { + return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid); } -ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, uuid_t uuid) -{ - return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid); +ProcessGroup *FlowControllerImpl::createRemoteProcessGroup(std::string name, + uuid_t uuid) { + return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid); } -Connection *FlowController::createConnection(std::string name, uuid_t uuid) -{ - return new Connection(name, uuid); +Connection *FlowControllerImpl::createConnection(std::string name, + uuid_t uuid) { + return new Connection(name, uuid); } +void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { + uuid_t uuid; + ProcessGroup *group = NULL; + // generate the random UIID + uuid_generate(uuid); -void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { - uuid_t uuid; - ProcessGroup *group = NULL; + std::string flowName = rootFlowNode["name"].as<std::string>(); - // generate the random UIID - uuid_generate(uuid); + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr); + _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); + group = this->createRootProcessGroup(flowName, uuid); + this->_root = group; + this->_name = flowName; +} - std::string flowName = rootFlowNode["name"].as<std::string>(); +void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode, + ProcessGroup *parentGroup) { + int64_t schedulingPeriod = -1; + int64_t penalizationPeriod = -1; + int64_t yieldPeriod = -1; + int64_t runDurationNanos = -1; + uuid_t uuid; + Processor *processor = NULL; + + if (!parentGroup) { + _logger->log_error("parseProcessNodeYaml: no parent group exists"); + return; + } - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr); - _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); - group = this->createRootProcessGroup(flowName, uuid); - this->_root = group; - this->_name = flowName; -} + if (processorsNode) { + + if (processorsNode.IsSequence()) { + // Evaluate sequence of processors + int numProcessors = processorsNode.size(); + + for (YAML::const_iterator iter = processorsNode.begin(); + iter != processorsNode.end(); ++iter) { + ProcessorConfig procCfg; + YAML::Node procNode = iter->as<YAML::Node>(); + + procCfg.name = procNode["name"].as<std::string>(); + _logger->log_debug("parseProcessorNode: name => [%s]", + procCfg.name.c_str()); + procCfg.javaClass = procNode["class"].as<std::string>(); + _logger->log_debug("parseProcessorNode: class => [%s]", + procCfg.javaClass.c_str()); + + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + + // generate the random UUID + uuid_generate(uuid); + + // Determine the processor name only from the Java class + int lastOfIdx = procCfg.javaClass.find_last_of("."); + if (lastOfIdx != std::string::npos) { + lastOfIdx++; // if a value is found, increment to move beyond the . + int nameLength = procCfg.javaClass.length() - lastOfIdx; + std::string processorName = procCfg.javaClass.substr( + lastOfIdx, nameLength); + processor = this->createProcessor(processorName, uuid); + } -void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGroup *parentGroup) { - int64_t schedulingPeriod = -1; - int64_t penalizationPeriod = -1; - int64_t yieldPeriod = -1; - int64_t runDurationNanos = -1; - uuid_t uuid; - Processor *processor = NULL; - - if (!parentGroup) { - _logger->log_error("parseProcessNodeYaml: no parent group exists"); - return; - } + if (!processor) { + _logger->log_error( + "Could not create a processor %s with name %s", + procCfg.name.c_str(), uuidStr); + throw std::invalid_argument( + "Could not create processor " + procCfg.name); + } + processor->setName(procCfg.name); + + procCfg.maxConcurrentTasks = + procNode["max concurrent tasks"].as<std::string>(); + _logger->log_debug( + "parseProcessorNode: max concurrent tasks => [%s]", + procCfg.maxConcurrentTasks.c_str()); + procCfg.schedulingStrategy = procNode["scheduling strategy"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: scheduling strategy => [%s]", + procCfg.schedulingStrategy.c_str()); + procCfg.schedulingPeriod = procNode["scheduling period"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: scheduling period => [%s]", + procCfg.schedulingPeriod.c_str()); + procCfg.penalizationPeriod = procNode["penalization period"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: penalization period => [%s]", + procCfg.penalizationPeriod.c_str()); + procCfg.yieldPeriod = + procNode["yield period"].as<std::string>(); + _logger->log_debug("parseProcessorNode: yield period => [%s]", + procCfg.yieldPeriod.c_str()); + procCfg.yieldPeriod = procNode["run duration nanos"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: run duration nanos => [%s]", + procCfg.runDurationNanos.c_str()); + + // handle auto-terminated relationships + YAML::Node autoTerminatedSequence = + procNode["auto-terminated relationships list"]; + std::vector<std::string> rawAutoTerminatedRelationshipValues; + if (autoTerminatedSequence.IsSequence() + && !autoTerminatedSequence.IsNull() + && autoTerminatedSequence.size() > 0) { + for (YAML::const_iterator relIter = + autoTerminatedSequence.begin(); + relIter != autoTerminatedSequence.end(); + ++relIter) { + std::string autoTerminatedRel = + relIter->as<std::string>(); + rawAutoTerminatedRelationshipValues.push_back( + autoTerminatedRel); + } + } + procCfg.autoTerminatedRelationships = + rawAutoTerminatedRelationshipValues; + + // handle processor properties + YAML::Node propertiesNode = procNode["Properties"]; + parsePropertiesNodeYaml(&propertiesNode, processor); + + // Take care of scheduling + TimeUnit unit; + if (Property::StringToTime(procCfg.schedulingPeriod, + schedulingPeriod, unit) + && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, + schedulingPeriod)) { + _logger->log_debug( + "convert: parseProcessorNode: schedulingPeriod => [%d] ns", + schedulingPeriod); + processor->setSchedulingPeriodNano(schedulingPeriod); + } - if (processorsNode) { - - if (processorsNode.IsSequence()) { - // Evaluate sequence of processors - int numProcessors = processorsNode.size(); - - - for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) { - ProcessorConfig procCfg; - YAML::Node procNode = iter->as<YAML::Node>(); - - procCfg.name = procNode["name"].as<std::string>(); - _logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str()); - procCfg.javaClass = procNode["class"].as<std::string>(); - _logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str()); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - - // generate the random UUID - uuid_generate(uuid); - - // Determine the processor name only from the Java class - int lastOfIdx = procCfg.javaClass.find_last_of("."); - if (lastOfIdx != std::string::npos) { - lastOfIdx++; // if a value is found, increment to move beyond the . - int nameLength = procCfg.javaClass.length() - lastOfIdx; - std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength); - processor = this->createProcessor(processorName, uuid); - } - - if (!processor) { - _logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr); - throw std::invalid_argument("Could not create processor " + procCfg.name); - } - processor->setName(procCfg.name); - - procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>(); - _logger->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str()); - procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>(); - _logger->log_debug("parseProcessorNode: scheduling strategy => [%s]", - procCfg.schedulingStrategy.c_str()); - procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>(); - _logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str()); - procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>(); - _logger->log_debug("parseProcessorNode: penalization period => [%s]", - procCfg.penalizationPeriod.c_str()); - procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); - _logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str()); - procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); - _logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str()); - - // handle auto-terminated relationships - YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; - std::vector<std::string> rawAutoTerminatedRelationshipValues; - if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() - && autoTerminatedSequence.size() > 0) { - for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); - relIter != autoTerminatedSequence.end(); ++relIter) { - std::string autoTerminatedRel = relIter->as<std::string>(); - rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); - } - } - procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; - - // handle processor properties - YAML::Node propertiesNode = procNode["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); - - // Take care of scheduling - TimeUnit unit; - if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) - && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - _logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); - processor->setSchedulingPeriodNano(schedulingPeriod); - } - - if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) - && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { - _logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", - penalizationPeriod); - processor->setPenalizationPeriodMsec(penalizationPeriod); - } - - if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { - _logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); - processor->setYieldPeriodMsec(yieldPeriod); - } - - // Default to running - processor->setScheduledState(RUNNING); - - if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { - processor->setSchedulingStrategy(TIMER_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { - processor->setSchedulingStrategy(EVENT_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - } else { - processor->setSchedulingStrategy(CRON_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - - } - - int64_t maxConcurrentTasks; - if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - - if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { - _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); - processor->setRunDurationNano(runDurationNanos); - } - - std::set<Relationship> autoTerminatedRelationships; - for (auto &&relString : procCfg.autoTerminatedRelationships) { - Relationship relationship(relString, ""); - _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString.c_str()); - autoTerminatedRelationships.insert(relationship); - } - - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - parentGroup->addProcessor(processor); - } - } - } else { - throw new std::invalid_argument( - "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); - } -} + if (Property::StringToTime(procCfg.penalizationPeriod, + penalizationPeriod, unit) + && Property::ConvertTimeUnitToMS(penalizationPeriod, + unit, penalizationPeriod)) { + _logger->log_debug( + "convert: parseProcessorNode: penalizationPeriod => [%d] ms", + penalizationPeriod); + processor->setPenalizationPeriodMsec(penalizationPeriod); + } -void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGroup *parentGroup) { - uuid_t uuid; + if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, + unit) + && Property::ConvertTimeUnitToMS(yieldPeriod, unit, + yieldPeriod)) { + _logger->log_debug( + "convert: parseProcessorNode: yieldPeriod => [%d] ms", + yieldPeriod); + processor->setYieldPeriodMsec(yieldPeriod); + } - if (!parentGroup) { - _logger->log_error("parseRemoteProcessGroupYaml: no parent group exists"); - return; - } + // Default to running + processor->setScheduledState(RUNNING); + + if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { + processor->setSchedulingStrategy(TIMER_DRIVEN); + _logger->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); + } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { + processor->setSchedulingStrategy(EVENT_DRIVEN); + _logger->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); + } else { + processor->setSchedulingStrategy(CRON_DRIVEN); + _logger->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); - if (rpgNode) { - if (rpgNode->IsSequence()) { - for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) { - YAML::Node rpgNode = iter->as<YAML::Node>(); + } - auto name = rpgNode["name"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str()); + int64_t maxConcurrentTasks; + if (Property::StringToInt(procCfg.maxConcurrentTasks, + maxConcurrentTasks)) { + _logger->log_debug( + "parseProcessorNode: maxConcurrentTasks => [%d]", + maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } - std::string url = rpgNode["url"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str()); + if (Property::StringToInt(procCfg.runDurationNanos, + runDurationNanos)) { + _logger->log_debug( + "parseProcessorNode: runDurationNanos => [%d]", + runDurationNanos); + processor->setRunDurationNano(runDurationNanos); + } - std::string timeout = rpgNode["timeout"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str()); + std::set<Relationship> autoTerminatedRelationships; + for (auto &&relString : procCfg.autoTerminatedRelationships) { + Relationship relationship(relString, ""); + _logger->log_debug( + "parseProcessorNode: autoTerminatedRelationship => [%s]", + relString.c_str()); + autoTerminatedRelationships.insert(relationship); + } - std::string yieldPeriod = rpgNode["yield period"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str()); + processor->setAutoTerminatedRelationships( + autoTerminatedRelationships); - YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>(); - YAML::Node outputPorts = rpgNode["Output Ports"].as<YAML::Node>(); - ProcessGroup *group = NULL; + parentGroup->addProcessor(processor); + } + } + } else { + throw new std::invalid_argument( + "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); + } +} - // generate the random UUID - uuid_generate(uuid); +void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, + ProcessGroup *parentGroup) { + uuid_t uuid; - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); + if (!parentGroup) { + _logger->log_error( + "parseRemoteProcessGroupYaml: no parent group exists"); + return; + } - int64_t timeoutValue = -1; - int64_t yieldPeriodValue = -1; + if (rpgNode) { + if (rpgNode->IsSequence()) { + for (YAML::const_iterator iter = rpgNode->begin(); + iter != rpgNode->end(); ++iter) { + YAML::Node rpgNode = iter->as<YAML::Node>(); + + auto name = rpgNode["name"].as<std::string>(); + _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", + name.c_str()); + + std::string url = rpgNode["url"].as<std::string>(); + _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", + url.c_str()); + + std::string timeout = rpgNode["timeout"].as<std::string>(); + _logger->log_debug( + "parseRemoteProcessGroupYaml: timeout => [%s]", + timeout.c_str()); + + std::string yieldPeriod = + rpgNode["yield period"].as<std::string>(); + _logger->log_debug( + "parseRemoteProcessGroupYaml: yield period => [%s]", + yieldPeriod.c_str()); + + YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>(); + YAML::Node outputPorts = + rpgNode["Output Ports"].as<YAML::Node>(); + ProcessGroup *group = NULL; + + // generate the random UUID + uuid_generate(uuid); + + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + + int64_t timeoutValue = -1; + int64_t yieldPeriodValue = -1; + + group = this->createRemoteProcessGroup(name.c_str(), uuid); + group->setParent(parentGroup); + parentGroup->addProcessGroup(group); + + TimeUnit unit; + + if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) + && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, + yieldPeriodValue) && group) { + _logger->log_debug( + "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", + yieldPeriodValue); + group->setYieldPeriodMsec(yieldPeriodValue); + } - group = this->createRemoteProcessGroup(name.c_str(), uuid); - group->setParent(parentGroup); - parentGroup->addProcessGroup(group); + if (Property::StringToTime(timeout, timeoutValue, unit) + && Property::ConvertTimeUnitToMS(timeoutValue, unit, + timeoutValue) && group) { + _logger->log_debug( + "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", + timeoutValue); + group->setTimeOut(timeoutValue); + } - TimeUnit unit; + group->setTransmitting(true); + group->setURL(url); - if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) - && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) { - _logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue); - group->setYieldPeriodMsec(yieldPeriodValue); - } + if (inputPorts && inputPorts.IsSequence()) { + for (YAML::const_iterator portIter = inputPorts.begin(); + portIter != inputPorts.end(); ++portIter) { + _logger->log_debug("Got a current port, iterating..."); - if (Property::StringToTime(timeout, timeoutValue, unit) - && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) { - _logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue); - group->setTimeOut(timeoutValue); - } + YAML::Node currPort = portIter->as<YAML::Node>(); - group->setTransmitting(true); - group->setURL(url); + this->parsePortYaml(&currPort, group, SEND); + } // for node + } + if (outputPorts && outputPorts.IsSequence()) { + for (YAML::const_iterator portIter = outputPorts.begin(); + portIter != outputPorts.end(); ++portIter) { + _logger->log_debug("Got a current port, iterating..."); - if (inputPorts && inputPorts.IsSequence()) { - for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { - _logger->log_debug("Got a current port, iterating..."); + YAML::Node currPort = portIter->as<YAML::Node>(); - YAML::Node currPort = portIter->as<YAML::Node>(); + this->parsePortYaml(&currPort, group, RECEIVE); + } // for node + } - this->parsePortYaml(&currPort, group, SEND); - } // for node - } - if (outputPorts && outputPorts.IsSequence()) { - for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) { - _logger->log_debug("Got a current port, iterating..."); + } + } + } +} - YAML::Node currPort = portIter->as<YAML::Node>(); +void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode, + ProcessGroup *parent) { + uuid_t uuid; + Connection *connection = NULL; - this->parsePortYaml(&currPort, group, RECEIVE); - } // for node - } + if (!parent) { + _logger->log_error("parseProcessNode: no parent group was provided"); + return; + } - } - } - } -} + if (connectionsNode) { + + if (connectionsNode->IsSequence()) { + for (YAML::const_iterator iter = connectionsNode->begin(); + iter != connectionsNode->end(); ++iter) { + // generate the random UUID + uuid_generate(uuid); + + YAML::Node connectionNode = iter->as<YAML::Node>(); + + std::string name = connectionNode["name"].as<std::string>(); + std::string destName = connectionNode["destination name"].as< + std::string>(); + + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + + _logger->log_debug( + "Created connection with UUID %s and name %s", uuidStr, + name.c_str()); + connection = this->createConnection(name, uuid); + auto rawRelationship = + connectionNode["source relationship name"].as< + std::string>(); + Relationship relationship(rawRelationship, ""); + _logger->log_debug("parseConnection: relationship => [%s]", + rawRelationship.c_str()); + if (connection) + connection->setRelationship(relationship); + std::string connectionSrcProcName = + connectionNode["source name"].as<std::string>(); + + Processor *srcProcessor = this->_root->findProcessor( + connectionSrcProcName); + + if (!srcProcessor) { + _logger->log_error( + "Could not locate a source with name %s to create a connection", + connectionSrcProcName.c_str()); + throw std::invalid_argument( + "Could not locate a source with name %s to create a connection " + + connectionSrcProcName); + } -void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGroup *parent) { - uuid_t uuid; - Connection *connection = NULL; + Processor *destProcessor = this->_root->findProcessor(destName); + // If we could not find name, try by UUID + if (!destProcessor) { + uuid_t destUuid; + uuid_parse(destName.c_str(), destUuid); + destProcessor = this->_root->findProcessor(destUuid); + } + if (destProcessor) { + std::string destUuid = destProcessor->getUUIDStr(); + } - if (!parent) { - _logger->log_error("parseProcessNode: no parent group was provided"); - return; - } + uuid_t srcUuid; + uuid_t destUuid; + srcProcessor->getUUID(srcUuid); + connection->setSourceProcessorUUID(srcUuid); + destProcessor->getUUID(destUuid); + connection->setDestinationProcessorUUID(destUuid); - if (connectionsNode) { - - if (connectionsNode->IsSequence()) { - for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) { - // generate the random UUID - uuid_generate(uuid); - - YAML::Node connectionNode = iter->as<YAML::Node>(); - - std::string name = connectionNode["name"].as<std::string>(); - std::string destName = connectionNode["destination name"].as<std::string>(); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - - _logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str()); - connection = this->createConnection(name, uuid); - auto rawRelationship = connectionNode["source relationship name"].as<std::string>(); - Relationship relationship(rawRelationship, ""); - _logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str()); - if (connection) - connection->setRelationship(relationship); - std::string connectionSrcProcName = connectionNode["source name"].as<std::string>(); - - Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName); - - if (!srcProcessor) { - _logger->log_error("Could not locate a source with name %s to create a connection", - connectionSrcProcName.c_str()); - throw std::invalid_argument( - "Could not locate a source with name %s to create a connection " + connectionSrcProcName); - } - - Processor *destProcessor = this->_root->findProcessor(destName); - // If we could not find name, try by UUID - if (!destProcessor) { - uuid_t destUuid; - uuid_parse(destName.c_str(), destUuid); - destProcessor = this->_root->findProcessor(destUuid); - } - if (destProcessor) { - std::string destUuid = destProcessor->getUUIDStr(); - } - - uuid_t srcUuid; - uuid_t destUuid; - srcProcessor->getUUID(srcUuid); - connection->setSourceProcessorUUID(srcUuid); - destProcessor->getUUID(destUuid); - connection->setDestinationProcessorUUID(destUuid); - - if (connection) { - parent->addConnection(connection); - } - } - } - - if (connection) - parent->addConnection(connection); - - return; - } -} + if (connection) { + parent->addConnection(connection); + } + } + } + + if (connection) + parent->addConnection(connection); + return; + } +} -void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) { - uuid_t uuid; - Processor *processor = NULL; - RemoteProcessorGroupPort *port = NULL; +void FlowControllerImpl::parsePortYaml(YAML::Node *portNode, + ProcessGroup *parent, TransferDirection direction) { + uuid_t uuid; + Processor *processor = NULL; + RemoteProcessorGroupPort *port = NULL; - if (!parent) { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } + if (!parent) { + _logger->log_error("parseProcessNode: no parent group existed"); + return; + } - YAML::Node inputPortsObj = portNode->as<YAML::Node>(); + YAML::Node inputPortsObj = portNode->as<YAML::Node>(); - // generate the random UIID - uuid_generate(uuid); + // generate the random UIID + uuid_generate(uuid); - auto portId = inputPortsObj["id"].as<std::string>(); - auto nameStr = inputPortsObj["name"].as<std::string>(); - uuid_parse(portId.c_str(), uuid); + auto portId = inputPortsObj["id"].as<std::string>(); + auto nameStr = inputPortsObj["name"].as<std::string>(); + uuid_parse(portId.c_str(), uuid); - port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); + port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); - processor = (Processor *) port; - port->setDirection(direction); - port->setTimeOut(parent->getTimeOut()); - port->setTransmitting(true); - processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); - processor->initialize(); + processor = (Processor *) port; + port->setDirection(direction); + port->setTimeOut(parent->getTimeOut()); + port->setTransmitting(true); + processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); + processor->initialize(); - // handle port properties - YAML::Node nodeVal = portNode->as<YAML::Node>(); - YAML::Node propertiesNode = nodeVal["Properties"]; + // handle port properties + YAML::Node nodeVal = portNode->as<YAML::Node>(); + YAML::Node propertiesNode = nodeVal["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); + parsePropertiesNodeYaml(&propertiesNode, processor); - // add processor to parent - parent->addProcessor(processor); - processor->setScheduledState(RUNNING); - auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>(); - int64_t maxConcurrentTasks; - if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); + // add processor to parent + parent->addProcessor(processor); + processor->setScheduledState(RUNNING); + auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as< + std::string>(); + int64_t maxConcurrentTasks; + if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } + _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", + maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); } - -void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor) -{ - // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated - for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) - { - std::string propertyName = propsIter->first.as<std::string>(); - YAML::Node propertyValueNode = propsIter->second; - if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) - { - std::string rawValueString = propertyValueNode.as<std::string>(); - if (!processor->setProperty(propertyName, rawValueString)) - { - _logger->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName.c_str(), rawValueString.c_str(), processor->getName().c_str()); - } - } - } +void FlowControllerImpl::parsePropertiesNodeYaml(YAML::Node *propertiesNode, + Processor *processor) { + // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated + for (YAML::const_iterator propsIter = propertiesNode->begin(); + propsIter != propertiesNode->end(); ++propsIter) { + std::string propertyName = propsIter->first.as<std::string>(); + YAML::Node propertyValueNode = propsIter->second; + if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) { + std::string rawValueString = propertyValueNode.as<std::string>(); + if (!processor->setProperty(propertyName, rawValueString)) { + _logger->log_warn( + "Received property %s with value %s but is not one of the properties for %s", + propertyName.c_str(), rawValueString.c_str(), + processor->getName().c_str()); + } + } + } } -void FlowController::load() { +void FlowControllerImpl::load() { if (_running) { stop(true); } @@ -732,7 +809,7 @@ void FlowController::load() { } } -void FlowController::reload(std::string yamlFile) +void FlowControllerImpl::reload(std::string yamlFile) { _logger->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str()); stop(true); @@ -752,22 +829,24 @@ void FlowController::reload(std::string yamlFile) } } -bool FlowController::start() { - if (!_initialized) { - _logger->log_error("Can not start Flow Controller because it has not been initialized"); - return false; - } else { - if (!_running) { - _logger->log_info("Start Flow Controller"); - this->_timerScheduler.start(); - this->_eventScheduler.start(); - if (this->_root) - this->_root->startProcessing( - &this->_timerScheduler, - &this->_eventScheduler); - _running = true; - this->_protocol->start(); - } - return true; - } +bool FlowControllerImpl::start() { + if (!_initialized) { + _logger->log_error( + "Can not start Flow Controller because it has not been initialized"); + return false; + } else { + + if (!_running) { + _logger->log_info("Starting Flow Controller"); + this->_timerScheduler.start(); + this->_eventScheduler.start(); + if (this->_root) + this->_root->startProcessing(&this->_timerScheduler, + &this->_eventScheduler); + _running = true; + this->_protocol->start(); + _logger->log_info("Started Flow Controller"); + } + return true; + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 2dda47a..9d01016 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -83,7 +83,7 @@ FlowFileRecord::~FlowFileRecord() { // Decrease the flow file record owned count for the resource claim _claim->decreaseFlowFileRecordOwnedCount(); - if (_claim->getFlowFileRecordOwnedCount() == 0) + if (_claim->getFlowFileRecordOwnedCount() <= 0) { _logger->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str()); std::remove(_claim->getContentFullPath().c_str()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/09d973ba/libminifi/src/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp index 70969c9..53d285d 100644 --- a/libminifi/src/GetFile.cpp +++ b/libminifi/src/GetFile.cpp @@ -32,8 +32,12 @@ #include <dirent.h> #include <limits.h> #include <unistd.h> +#if (__GNUC__ >= 4) + #if (__GNUC_MINOR__ < 9) + #include <regex.h> + #endif +#endif #include <regex> - #include "TimeUtil.h" #include "GetFile.h" #include "ProcessContext.h" @@ -81,6 +85,8 @@ void GetFile::initialize() void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) { std::string value; + + _logger->log_info("onTrigger GetFile"); if (context->getProperty(Directory.getName(), value)) { _directory = value; @@ -97,6 +103,8 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) { Property::StringToBool(value, _keepSourceFile); } + + _logger->log_info("onTrigger GetFile"); if (context->getProperty(MaxAge.getName(), value)) { TimeUnit unit; @@ -143,6 +151,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) } // Perform directory list + _logger->log_info("Is listing empty %i",isListingEmpty()); if (isListingEmpty()) { if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) @@ -150,6 +159,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) performListing(_directory); } } + _logger->log_info("Is listing empty %i",isListingEmpty()); if (!isListingEmpty()) { @@ -159,6 +169,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) pollListing(list, _batchSize); while (!list.empty()) { + std::string fileName = list.front(); list.pop(); _logger->log_info("GetFile process %s", fileName.c_str()); @@ -185,6 +196,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) throw; } } + } bool GetFile::isListingEmpty() @@ -243,16 +255,34 @@ bool GetFile::acceptFile(std::string fullName, std::string name) if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0) return false; - try { - std::regex re(_fileFilter); - if (!std::regex_match(name, re)) { - return false; - } - } catch (std::regex_error e) { - _logger->log_error("Invalid File Filter regex: %s.", e.what()); - return false; - } + #ifdef __GNUC__ + #if (__GNUC__ >= 4) + #if (__GNUC_MINOR__ < 9) + regex_t regex; + int ret = regcomp(®ex, _fileFilter.c_str(),0); + if (ret) + return false; + ret = regexec(®ex,name.c_str(),(size_t)0,NULL,0); + regfree(®ex); + if (ret) + return false; + #else + try{ + std::regex re(_fileFilter); + + if (!std::regex_match(name, re)) { + return false; + } + } catch (std::regex_error e) { + _logger->log_error("Invalid File Filter regex: %s.", e.what()); + return false; + } + #endif + #endif + #else + _logger->log_info("Cannot support regex filtering"); + #endif return true; } @@ -261,11 +291,14 @@ bool GetFile::acceptFile(std::string fullName, std::string name) void GetFile::performListing(std::string dir) { + _logger->log_info("Performing file listing against %s",dir.c_str()); DIR *d; d = opendir(dir.c_str()); if (!d) return; - while (1) + // only perform a listing while we are not empty + _logger->log_info("Performing file listing against %s",dir.c_str()); + while (isRunning()) { struct dirent *entry; entry = readdir(d);
