http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/io/tls/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index c56f6c8..b8915a6 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -38,27 +38,24 @@ namespace io { #define TLS_ERROR_KEY_ERROR 4 #define TLS_ERROR_CERT_ERROR 5 -class OpenSSLInitializer -{ +class OpenSSLInitializer { public: static OpenSSLInitializer *getInstance() { - OpenSSLInitializer* atomic_context = context_instance.load( - std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); - if (atomic_context == nullptr) { - std::lock_guard<std::mutex> lock(context_mutex); - atomic_context = context_instance.load(std::memory_order_relaxed); - if (atomic_context == nullptr) { - atomic_context = new OpenSSLInitializer(); - std::atomic_thread_fence(std::memory_order_release); - context_instance.store(atomic_context, std::memory_order_relaxed); - } - } - return atomic_context; - } - - OpenSSLInitializer() - { + OpenSSLInitializer* atomic_context = context_instance.load(std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (atomic_context == nullptr) { + std::lock_guard<std::mutex> lock(context_mutex); + atomic_context = context_instance.load(std::memory_order_relaxed); + if (atomic_context == nullptr) { + atomic_context = new OpenSSLInitializer(); + std::atomic_thread_fence(std::memory_order_release); + context_instance.store(atomic_context, std::memory_order_relaxed); + } + } + return atomic_context; + } + + OpenSSLInitializer() { SSL_library_init(); OpenSSL_add_all_algorithms(); SSL_load_error_strings(); @@ -68,11 +65,11 @@ class OpenSSLInitializer static std::mutex context_mutex; }; -class TLSContext: public SocketContext { +class TLSContext : public SocketContext { public: TLSContext(const std::shared_ptr<Configure> &configure); - + virtual ~TLSContext() { if (0 != ctx) SSL_CTX_free(ctx); @@ -93,8 +90,7 @@ class TLSContext: public SocketContext { static int pemPassWordCb(char *buf, int size, int rwflag, void *configure) { std::string passphrase; - if (static_cast<Configure*>(configure)->get( - Configure::nifi_security_client_pass_phrase, passphrase)) { + if (static_cast<Configure*>(configure)->get(Configure::nifi_security_client_pass_phrase, passphrase)) { std::ifstream file(passphrase.c_str(), std::ifstream::in); if (!file.good()) { @@ -103,8 +99,7 @@ class TLSContext: public SocketContext { } std::string password; - password.assign((std::istreambuf_iterator<char>(file)), - std::istreambuf_iterator<char>()); + password.assign((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>()); file.close(); memset(buf, 0x00, size); memcpy(buf, password.c_str(), password.length() - 1); @@ -114,7 +109,6 @@ class TLSContext: public SocketContext { return 0; } - std::shared_ptr<logging::Logger> logger_; std::shared_ptr<Configure> configure_; SSL_CTX *ctx; @@ -133,8 +127,7 @@ class TLSSocket : public Socket { * @param port connecting port * @param listeners number of listeners in the queue */ - explicit TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, - const uint16_t listeners); + explicit TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners); /** * Constructor that creates a client socket.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/io/validation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h index a5e1bc5..70de439 100644 --- a/libminifi/include/io/validation.h +++ b/libminifi/include/io/validation.h @@ -45,8 +45,7 @@ class size_function_functor_checker { * Determines if the variable is null or ::size() == 0 */ template<typename T> -static auto IsNullOrEmpty( - T &object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type { +static auto IsNullOrEmpty(T &object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type { return object.size() == 0; } @@ -54,8 +53,7 @@ static auto IsNullOrEmpty( * Determines if the variable is null or ::size() == 0 */ template<typename T> -static auto IsNullOrEmpty( - T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type { +static auto IsNullOrEmpty(T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type { return (nullptr == object || object->size() == 0); } @@ -63,8 +61,7 @@ static auto IsNullOrEmpty( * Determines if the variable is null or ::size() == 0 */ template<typename T> -static auto IsNullOrEmpty( - T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type { +static auto IsNullOrEmpty(T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type { return (nullptr == object); } @@ -72,8 +69,7 @@ static auto IsNullOrEmpty( * Determines if the variable is null or ::size() == 0 */ template<typename T> -static auto IsNullOrEmpty( - std::shared_ptr<T> object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type { +static auto IsNullOrEmpty(std::shared_ptr<T> object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type { return (nullptr == object || nullptr == object.get()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/AppendHostInfo.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h index 610251f..e7c90ac 100644 --- a/libminifi/include/processors/AppendHostInfo.h +++ b/libminifi/include/processors/AppendHostInfo.h @@ -43,7 +43,7 @@ class AppendHostInfo : public core::Processor { */ AppendHostInfo(std::string name, uuid_t uuid = NULL) : core::Processor(name, uuid), - logger_(logging::LoggerFactory<AppendHostInfo>::getLogger()){ + logger_(logging::LoggerFactory<AppendHostInfo>::getLogger()) { } // Destructor virtual ~AppendHostInfo() { @@ -60,8 +60,7 @@ class AppendHostInfo : public core::Processor { public: // OnTrigger method, implemented by NiFi AppendHostInfo - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi AppendHostInfo virtual void initialize(void); @@ -74,7 +73,6 @@ class AppendHostInfo : public core::Processor { REGISTER_RESOURCE(AppendHostInfo); - } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h index c719405..28dcf76 100644 --- a/libminifi/include/processors/ExecuteProcess.h +++ b/libminifi/include/processors/ExecuteProcess.h @@ -53,7 +53,7 @@ class ExecuteProcess : public core::Processor { */ ExecuteProcess(std::string name, uuid_t uuid = NULL) : Processor(name, uuid), - logger_(logging::LoggerFactory<ExecuteProcess>::getLogger()){ + logger_(logging::LoggerFactory<ExecuteProcess>::getLogger()) { _redirectErrorStream = false; _batchDuration = 0; _workingDir = "."; @@ -93,8 +93,7 @@ class ExecuteProcess : public core::Processor { public: // OnTrigger method, implemented by NiFi ExecuteProcess - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi ExecuteProcess virtual void initialize(void); @@ -107,8 +106,7 @@ class ExecuteProcess : public core::Processor { std::string _command; std::string _commandArgument; std::string _workingDir; - int64_t _batchDuration; - bool _redirectErrorStream; + int64_t _batchDuration;bool _redirectErrorStream; // Full command std::string _fullCommand; // whether the process is running http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h index 2f24e64..abb5740 100644 --- a/libminifi/include/processors/GenerateFlowFile.h +++ b/libminifi/include/processors/GenerateFlowFile.h @@ -76,8 +76,7 @@ class GenerateFlowFile : public core::Processor { public: // OnTrigger method, implemented by NiFi GenerateFlowFile - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi GenerateFlowFile virtual void initialize(void); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index 8864601..df8f775 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -33,14 +33,11 @@ namespace minifi { namespace processors { struct GetFileRequest { - std::string directory = "."; - bool recursive = true; - bool keepSourceFile = false; + std::string directory = ".";bool recursive = true;bool keepSourceFile = false; int64_t minAge = 0; int64_t maxAge = 0; int64_t minSize = 0; - int64_t maxSize = 0; - bool ignoreHiddenFile = true; + int64_t maxSize = 0;bool ignoreHiddenFile = true; int64_t pollInterval = 0; int64_t batchSize = 10; std::string fileFilter = "[^\\.].*"; @@ -54,7 +51,8 @@ class GetFile : public core::Processor { * Create a new processor */ explicit GetFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid), logger_(logging::LoggerFactory<GetFile>::getLogger()) { + : Processor(name, uuid), + logger_(logging::LoggerFactory<GetFile>::getLogger()) { } // Destructor @@ -84,15 +82,13 @@ class GetFile : public core::Processor { * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - void onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); /** * Execution trigger for the GetFile Processor * @param context processor context * @param session processor session reference. */ - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi GetFile virtual void initialize(void); @@ -119,11 +115,9 @@ class GetFile : public core::Processor { // Put full path file name into directory listing void putListing(std::string fileName); // Poll directory listing for files - void pollListing(std::queue<std::string> &list, - const GetFileRequest &request); + void pollListing(std::queue<std::string> &list, const GetFileRequest &request); // Check whether file can be added to the directory listing - bool acceptFile(std::string fullName, std::string name, - const GetFileRequest &request); + bool acceptFile(std::string fullName, std::string name, const GetFileRequest &request); // Get file request object. GetFileRequest request_; // Mutex for protection of the directory listing http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/InvokeHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h index 59bc2bd..ab78fd5 100644 --- a/libminifi/include/processors/InvokeHTTP.h +++ b/libminifi/include/processors/InvokeHTTP.h @@ -53,10 +53,8 @@ struct HTTPRequestResponse { /** * Receive HTTP Response. */ - static size_t recieve_write(char * data, size_t size, size_t nmemb, - void * p) { - return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, - nmemb); + static size_t recieve_write(char * data, size_t size, size_t nmemb, void * p) { + return static_cast<HTTPRequestResponse*>(p)->write_content(data, size, nmemb); } /** @@ -160,8 +158,7 @@ class InvokeHTTP : public core::Processor { void onTrigger(core::ProcessContext *context, core::ProcessSession *session); void initialize(); - void onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); protected: @@ -194,9 +191,7 @@ class InvokeHTTP : public core::Processor { */ void set_request_method(CURL *curl, const std::string &); - struct curl_slist *build_header_list( - CURL *curl, std::string regex, - const std::map<std::string, std::string> &); + struct curl_slist *build_header_list(CURL *curl, std::string regex, const std::map<std::string, std::string> &); bool matches(const std::string &value, const std::string &sregex); @@ -209,10 +204,8 @@ class InvokeHTTP : public core::Processor { * @param isSuccess success code or not * @param statuscode http response code. */ - void route(std::shared_ptr<FlowFileRecord> &request, - std::shared_ptr<FlowFileRecord> &response, - core::ProcessSession *session, core::ProcessContext *context, - bool isSuccess, + void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, + bool isSuccess, int statusCode); /** * Determine if we should emit a new flowfile based on our activity http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h index 03fe03d..c9e42bc 100644 --- a/libminifi/include/processors/ListenHTTP.h +++ b/libminifi/include/processors/ListenHTTP.h @@ -68,17 +68,13 @@ class ListenHTTP : public core::Processor { void onTrigger(core::ProcessContext *context, core::ProcessSession *session); void initialize(); - void onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); // HTTP request handler class Handler : public CivetHandler { public: - Handler(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory, - std::string &&authDNPattern, - std::string &&headersAsAttributesPattern); - bool handlePost(CivetServer *server, struct mg_connection *conn); + Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost( + CivetServer *server, struct mg_connection *conn); private: // Send HTTP 500 error response to client @@ -95,8 +91,7 @@ class ListenHTTP : public core::Processor { // Write callback for transferring data from HTTP request to content repo class WriteCallback : public OutputStreamCallback { public: - WriteCallback(struct mg_connection *conn, - const struct mg_request_info *reqInfo); + WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo); void process(std::ofstream *stream); private: @@ -115,7 +110,6 @@ class ListenHTTP : public core::Processor { std::unique_ptr<Handler> _handler; }; - REGISTER_RESOURCE(ListenHTTP); } /* namespace processors */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h index f0380db..ed54b44 100644 --- a/libminifi/include/processors/ListenSyslog.h +++ b/libminifi/include/processors/ListenSyslog.h @@ -127,8 +127,7 @@ class ListenSyslog : public core::Processor { public: // OnTrigger method, implemented by NiFi ListenSyslog - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi ListenSyslog virtual void initialize(void); @@ -193,8 +192,7 @@ class ListenSyslog : public core::Processor { int64_t _maxBatchSize; std::string _messageDelimiter; std::string _protocol; - int64_t _port; - bool _parseMessages; + int64_t _port;bool _parseMessages; int _serverSocket; std::vector<int> _clientSockets; int _maxFds; @@ -202,8 +200,7 @@ class ListenSyslog : public core::Processor { // thread std::thread *_thread; // whether to reset the server socket - bool _resetServerSocket; - bool _serverTheadRunning; + bool _resetServerSocket;bool _serverTheadRunning; // buffer for read socket char _buffer[2048]; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/LoadProcessors.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h index 7a16773..3e6cfcf 100644 --- a/libminifi/include/processors/LoadProcessors.h +++ b/libminifi/include/processors/LoadProcessors.h @@ -30,5 +30,4 @@ #include "PutFile.h" #include "TailFile.h" - #endif /* LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h index cbb8f1a..88230f7 100644 --- a/libminifi/include/processors/LogAttribute.h +++ b/libminifi/include/processors/LogAttribute.h @@ -110,8 +110,7 @@ class LogAttribute : public core::Processor { public: // OnTrigger method, implemented by NiFi LogAttribute - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi LogAttribute virtual void initialize(void); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index 3918d0a..d08ebbc 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -48,7 +48,8 @@ class PutFile : public core::Processor { * Create a new processor */ PutFile(std::string name, uuid_t uuid = NULL) - : core::Processor(name, uuid), logger_(logging::LoggerFactory<PutFile>::getLogger()) { + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<PutFile>::getLogger()) { } // Destructor virtual ~PutFile() { @@ -67,12 +68,10 @@ class PutFile : public core::Processor { * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - virtual void onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); // OnTrigger method, implemented by NiFi PutFile - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi PutFile virtual void initialize(void); @@ -80,13 +79,11 @@ class PutFile : public core::Processor { public: ReadCallback(const std::string &tmpFile, const std::string &destFile); ~ReadCallback(); - virtual void process(std::ifstream *stream); - bool commit(); + virtual void process(std::ifstream *stream);bool commit(); private: std::shared_ptr<logging::Logger> logger_; - std::ofstream _tmpFileOs; - bool _writeSucceeded = false; + std::ofstream _tmpFileOs;bool _writeSucceeded = false; std::string _tmpFile; std::string _destFile; }; @@ -100,9 +97,7 @@ class PutFile : public core::Processor { // conflict resolution type. std::string conflict_resolution_; - bool putFile(core::ProcessSession *session, - std::shared_ptr<FlowFileRecord> flowFile, - const std::string &tmpFile, const std::string &destFile); + bool putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile); std::shared_ptr<logging::Logger> logger_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/processors/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index 47ec3fb..59cf224 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -41,7 +41,8 @@ class TailFile : public core::Processor { * Create a new processor */ explicit TailFile(std::string name, uuid_t uuid = NULL) - : core::Processor(name, uuid), logger_(logging::LoggerFactory<TailFile>::getLogger()) { + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<TailFile>::getLogger()) { _stateRecovered = false; } // Destructor @@ -58,8 +59,7 @@ class TailFile : public core::Processor { public: // OnTrigger method, implemented by NiFi TailFile - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi TailFile virtual void initialize(void); // recoverState http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/properties/Properties.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index 8d08e8d..8797c19 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -38,11 +38,11 @@ namespace minifi { class Properties { public: Properties(); - + virtual ~Properties() { } - + // Clear the load config void clear() { std::lock_guard<std::mutex> lock(mutex_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 86ee713..1fa4a72 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -49,8 +49,7 @@ namespace provenance { #define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048 // Provenance Event Record -class ProvenanceEventRecord : - protected org::apache::nifi::minifi::io::Serializable { +class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializable { public: enum ProvenanceEventType { @@ -155,14 +154,14 @@ class ProvenanceEventRecord : */ REPLAY }; - static const char *ProvenanceEventTypeStr[REPLAY+1]; - public: + static const char *ProvenanceEventTypeStr[REPLAY + 1]; + public: // Constructor /*! * Create a new provenance event record */ - ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, - std::string componentType): logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) { + ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, std::string componentType) + : logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) { _eventType = event; _componentId = componentId; _componentType = componentType; @@ -174,7 +173,8 @@ class ProvenanceEventRecord : _eventIdStr = eventIdStr; } - ProvenanceEventRecord(): logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) { + ProvenanceEventRecord() + : logger_(logging::LoggerFactory<ProvenanceEventRecord>::getLogger()) { _eventTime = getTimeMillis(); } @@ -271,8 +271,7 @@ class ProvenanceEventRecord : } // Add Parent UUID void addParentUuid(std::string uuid) { - if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) - != _parentUuids.end()) + if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end()) return; else _parentUuids.push_back(uuid); @@ -284,9 +283,7 @@ class ProvenanceEventRecord : } // Remove Parent UUID void removeParentUuid(std::string uuid) { - _parentUuids.erase( - std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), - _parentUuids.end()); + _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end()); } // Remove Parent Flow File void removeParentFlowFile(std::shared_ptr<core::FlowFile> flow) { @@ -299,8 +296,7 @@ class ProvenanceEventRecord : } // Add Child UUID void addChildUuid(std::string uuid) { - if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) - != _childrenUuids.end()) + if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end()) return; else _childrenUuids.push_back(uuid); @@ -312,9 +308,7 @@ class ProvenanceEventRecord : } // Remove Child UUID void removeChildUuid(std::string uuid) { - _childrenUuids.erase( - std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), - _childrenUuids.end()); + _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end()); } // Remove Child Flow File void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) { @@ -369,8 +363,7 @@ class ProvenanceEventRecord : return DeSerialize(stream.getBuffer(), stream.getSize()); } // DeSerialize - bool DeSerialize(const std::shared_ptr<core::Repository> &repo, - std::string key); + bool DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key); protected: @@ -440,8 +433,8 @@ class ProvenanceReporter { /*! * Create a new provenance reporter associated with the process session */ - ProvenanceReporter(std::shared_ptr<core::Repository> repo, - std::string componentId, std::string componentType) : logger_(logging::LoggerFactory<ProvenanceReporter>::getLogger()) { + ProvenanceReporter(std::shared_ptr<core::Repository> repo, std::string componentId, std::string componentType) + : logger_(logging::LoggerFactory<ProvenanceReporter>::getLogger()) { _componentId = componentId; _componentType = componentType; repo_ = repo; @@ -474,12 +467,8 @@ class ProvenanceReporter { _events.clear(); } // allocate - ProvenanceEventRecord *allocate( - ProvenanceEventRecord::ProvenanceEventType eventType, - std::shared_ptr<core::FlowFile> flow) { - ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, - _componentId, - _componentType); + ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) { + ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType); if (event) event->fromFlowFile(flow); @@ -490,39 +479,27 @@ class ProvenanceReporter { // create void create(std::shared_ptr<core::FlowFile> flow, std::string detail); // route - void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, - std::string detail, uint64_t processingDuration); + void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, uint64_t processingDuration); // modifyAttributes - void modifyAttributes(std::shared_ptr<core::FlowFile> flow, - std::string detail); + void modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail); // modifyContent - void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, - uint64_t processingDuration); + void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, uint64_t processingDuration); // clone - void clone(std::shared_ptr<core::FlowFile> parent, - std::shared_ptr<core::FlowFile> child); + void clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child); // join - void join(std::vector<std::shared_ptr<core::FlowFile> > parents, - std::shared_ptr<core::FlowFile> child, std::string detail, - uint64_t processingDuration); + void join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, uint64_t processingDuration); // fork - void fork(std::vector<std::shared_ptr<core::FlowFile> > child, - std::shared_ptr<core::FlowFile> parent, std::string detail, - uint64_t processingDuration); + void fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, uint64_t processingDuration); // expire void expire(std::shared_ptr<core::FlowFile> flow, std::string detail); // drop void drop(std::shared_ptr<core::FlowFile> flow, std::string reason); // send - void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, - std::string detail, uint64_t processingDuration, bool force); + void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force); // fetch - void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, - std::string detail, uint64_t processingDuration); + void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration); // receive - void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, - std::string sourceSystemFlowFileIdentifier, std::string detail, - uint64_t processingDuration); + void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration); protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index 8bcc316..dd2c5ec 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -36,20 +36,16 @@ namespace provenance { #define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute #define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec -class ProvenanceRepository : public core::Repository, - public std::enable_shared_from_this<ProvenanceRepository> { +class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> { public: // Constructor /*! * Create a new provenance repository */ - ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, - int64_t maxPartitionMillis = - MAX_PROVENANCE_ENTRY_LIFE_TIME, - int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, - uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) - : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, - maxPartitionMillis, maxPartitionBytes, purgePeriod), + ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = + MAX_PROVENANCE_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) + : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) { db_ = NULL; @@ -60,53 +56,42 @@ class ProvenanceRepository : public core::Repository, if (db_) delete db_; } - + void start() { - if (this->purge_period_ <= 0) - return; - if (running_) - return; - thread_ = std::thread(&ProvenanceRepository::run, shared_from_this()); - thread_.detach(); - running_ = true; - logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); -} + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&ProvenanceRepository::run, shared_from_this()); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); + } // initialize virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) { std::string value; - if (config->get(Configure::nifi_provenance_repository_directory_default, - value)) { + if (config->get(Configure::nifi_provenance_repository_directory_default, value)) { directory_ = value; } - logger_->log_info("NiFi Provenance Repository Directory %s", - directory_.c_str()); - if (config->get(Configure::nifi_provenance_repository_max_storage_size, - value)) { + logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str()); + if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) { core::Property::StringToInt(value, max_partition_bytes_); } - logger_->log_info("NiFi Provenance Max Partition Bytes %d", - max_partition_bytes_); - if (config->get(Configure::nifi_provenance_repository_max_storage_time, - value)) { + logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_); + if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, max_partition_millis_, unit) - && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, - max_partition_millis_)) { + if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) { } } - logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", - max_partition_millis_); + logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_); leveldb::Options options; options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), - &db_); + leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), &db_); if (status.ok()) { - logger_->log_info("NiFi Provenance Repository database open %s success", - directory_.c_str()); + logger_->log_info("NiFi Provenance Repository database open %s success", directory_.c_str()); } else { - logger_->log_error("NiFi Provenance Repository database open %s fail", - directory_.c_str()); + logger_->log_error("NiFi Provenance Repository database open %s fail", directory_.c_str()); return false; } @@ -115,8 +100,8 @@ class ProvenanceRepository : public core::Repository, // Put virtual bool Put(std::string key, uint8_t *buf, int bufLen) { - if (repo_full_) - return false; + if (repo_full_) + return false; // persistent to the DB leveldb::Slice value((const char *) buf, bufLen); @@ -147,40 +132,33 @@ class ProvenanceRepository : public core::Repository, } // Persistent event void registerEvent(std::shared_ptr<ProvenanceEventRecord> &event) { - event->Serialize( - std::static_pointer_cast<core::Repository>(shared_from_this())); + event->Serialize(std::static_pointer_cast<core::Repository>(shared_from_this())); } // Remove event void removeEvent(ProvenanceEventRecord *event) { Delete(event->getEventId()); } //! get record - void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) - { - std::lock_guard<std::mutex> lock(mutex_); - leveldb::Iterator* it = db_->NewIterator( - leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); - std::string key = it->key().ToString(); - if (records.size() >= maxSize) - break; - if (eventRead->DeSerialize((uint8_t *) it->value().data(), - (int) it->value().size())) - { - records.push_back(eventRead); - } - } - delete it; + void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) { + std::lock_guard<std::mutex> lock(mutex_); + leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); + std::string key = it->key().ToString(); + if (records.size() >= maxSize) + break; + if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + records.push_back(eventRead); + } + } + delete it; } //! purge record - void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) - { - std::lock_guard<std::mutex> lock(mutex_); - for (auto record : records) - { - Delete(record->getEventId()); - } + void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) { + std::lock_guard<std::mutex> lock(mutex_); + for (auto record : records) { + Delete(record->getEventId()); + } } // destroy void destroy() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/ByteInputCallBack.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ByteInputCallBack.h b/libminifi/include/utils/ByteInputCallBack.h index 72303ff..a2b7838 100644 --- a/libminifi/include/utils/ByteInputCallBack.h +++ b/libminifi/include/utils/ByteInputCallBack.h @@ -41,8 +41,7 @@ class ByteInputCallBack : public InputStreamCallback { virtual void process(std::ifstream *stream) { - std::vector<char> nv = std::vector<char>(std::istreambuf_iterator<char>(*stream), - std::istreambuf_iterator<char>()); + std::vector<char> nv = std::vector<char>(std::istreambuf_iterator<char>(*stream), std::istreambuf_iterator<char>()); vec = std::move(nv); ptr = &vec[0]; @@ -53,9 +52,6 @@ class ByteInputCallBack : public InputStreamCallback { return ptr; } - - - const size_t getBufferSize() { return vec.size(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/StringUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index 0fc1996..b754467 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -66,11 +66,7 @@ class StringUtils { * @returns modified string */ static inline std::string trimLeft(std::string s) { - s.erase( - s.begin(), - std::find_if( - s.begin(), s.end(), - std::not1(std::pointer_to_unary_function<int, int>(std::isspace)))); + s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace)))); return s; } @@ -81,15 +77,10 @@ class StringUtils { */ static inline std::string trimRight(std::string s) { - s.erase( - std::find_if( - s.rbegin(), s.rend(), - std::not1(std::pointer_to_unary_function<int, int>(std::isspace))) - .base(), - s.end()); + s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))).base(), s.end()); return s; } - + static std::vector<std::string> split(const std::string &str, const std::string &delimiter) { std::vector<std::string> result; int last = 0; @@ -108,14 +99,13 @@ class StringUtils { * @param output output float * @param cp failure policy */ - static bool StringToFloat(std::string input, float &output, FailurePolicy cp = - RETURN) { + static bool StringToFloat(std::string input, float &output, FailurePolicy cp = RETURN) { try { output = std::stof(input); } catch (const std::invalid_argument &ie) { switch (cp) { case RETURN: - case NOTHING: + case NOTHING: return false; case EXIT: exit(1); @@ -125,7 +115,7 @@ class StringUtils { } catch (const std::out_of_range &ofr) { switch (cp) { case RETURN: - case NOTHING: + case NOTHING: return false; case EXIT: exit(1); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 4c399a7..77772cd 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -185,23 +185,23 @@ class ThreadPool { } // determines if threads are detached bool daemon_threads_; -// max worker threads + // max worker threads int max_worker_threads_; -// current worker tasks. + // current worker tasks. std::atomic<int> current_workers_; -// thread queue + // thread queue std::vector<std::thread> thread_queue_; -// manager thread + // manager thread std::thread manager_thread_; -// atomic running boolean + // atomic running boolean std::atomic<bool> running_; -// worker queue of worker objects + // worker queue of worker objects moodycamel::ConcurrentQueue<Worker<T>> worker_queue_; -// notification for available work + // notification for available work std::condition_variable tasks_available_; -// manager mutex + // manager mutex std::recursive_mutex manager_mutex_; -// work queue mutex + // work queue mutex std::mutex worker_queue_mutex_; /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/utils/TimeUtil.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h index 6805419..19c2566 100644 --- a/libminifi/include/utils/TimeUtil.h +++ b/libminifi/include/utils/TimeUtil.h @@ -33,8 +33,7 @@ * @returns milliseconds since epoch */ inline uint64_t getTimeMillis() { - return std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()).count(); + return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); } /** @@ -43,8 +42,7 @@ inline uint64_t getTimeMillis() { */ inline uint64_t getTimeNano() { - return std::chrono::duration_cast<std::chrono::nanoseconds>( - std::chrono::system_clock::now().time_since_epoch()).count(); + return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); } @@ -58,8 +56,7 @@ inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false) { char date[120]; time_t second = (time_t) (msec / 1000); msec = msec % 1000; - strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT, - (enforce_locale == true ? gmtime(&second) : localtime(&second))); + strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT, (enforce_locale == true ? gmtime(&second) : localtime(&second))); std::string ret = date; date[0] = '\0'; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 5c62a8d..d8e049c 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -23,50 +23,31 @@ namespace nifi { namespace minifi { const char *Configure::nifi_default_directory = "nifi.default.directory"; -const char *Configure::nifi_flow_configuration_file = - "nifi.flow.configuration.file"; +const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads"; -const char *Configure::nifi_administrative_yield_duration = - "nifi.administrative.yield.duration"; +const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; -const char *Configure::nifi_graceful_shutdown_seconds = - "nifi.flowcontroller.graceful.shutdown.period"; +const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period"; const char *Configure::nifi_log_level = "nifi.log.level"; const char *Configure::nifi_server_name = "nifi.server.name"; -const char *Configure::nifi_configuration_class_name = - "nifi.flow.configuration.class.name"; -const char *Configure::nifi_flow_repository_class_name = - "nifi.flow.repository.class.name"; -const char *Configure::nifi_volatile_repository_options = - "nifi.volatile.repository.options."; -const char *Configure::nifi_provenance_repository_class_name = - "nifi.provenance.repository.class.name"; +const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name"; +const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name"; +const char *Configure::nifi_volatile_repository_options = "nifi.volatile.repository.options."; +const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name"; const char *Configure::nifi_server_port = "nifi.server.port"; -const char *Configure::nifi_server_report_interval = - "nifi.server.report.interval"; -const char *Configure::nifi_provenance_repository_max_storage_size = - "nifi.provenance.repository.max.storage.size"; -const char *Configure::nifi_provenance_repository_max_storage_time = - "nifi.provenance.repository.max.storage.time"; -const char *Configure::nifi_provenance_repository_directory_default = - "nifi.provenance.repository.directory.default"; -const char *Configure::nifi_flowfile_repository_max_storage_size = - "nifi.flowfile.repository.max.storage.size"; -const char *Configure::nifi_flowfile_repository_max_storage_time = - "nifi.flowfile.repository.max.storage.time"; -const char *Configure::nifi_flowfile_repository_directory_default = - "nifi.flowfile.repository.directory.default"; +const char *Configure::nifi_server_report_interval = "nifi.server.report.interval"; +const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size"; +const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time"; +const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default"; +const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size"; +const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time"; +const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure"; -const char *Configure::nifi_security_need_ClientAuth = - "nifi.security.need.ClientAuth"; -const char *Configure::nifi_security_client_certificate = - "nifi.security.client.certificate"; -const char *Configure::nifi_security_client_private_key = - "nifi.security.client.private.key"; -const char *Configure::nifi_security_client_pass_phrase = - "nifi.security.client.pass.phrase"; -const char *Configure::nifi_security_client_ca_certificate = - "nifi.security.client.ca.certificate"; +const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; +const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate"; +const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key"; +const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase"; +const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index bc76044..0901a30 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -39,9 +39,7 @@ namespace apache { namespace nifi { namespace minifi { -Connection::Connection(std::shared_ptr<core::Repository> flow_repository, - std::string name, uuid_t uuid, uuid_t srcUUID, - uuid_t destUUID) +Connection::Connection(std::shared_ptr<core::Repository> flow_repository, std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID) : core::Connectable(name, uuid), flow_repository_(flow_repository), logger_(logging::LoggerFactory<Connection>::getLogger()) { @@ -91,8 +89,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) { queued_data_size_ += flow->getSize(); - logger_->log_debug("Enqueue flow file UUID %s to connection %s", - flow->getUUIDStr().c_str(), name_.c_str()); + logger_->log_debug("Enqueue flow file UUID %s to connection %s", flow->getUUIDStr().c_str(), name_.c_str()); } if (!flow->isStored()) { @@ -109,8 +106,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) { } } -std::shared_ptr<core::FlowFile> Connection::poll( - std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) { +std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) { std::lock_guard<std::mutex> lock(mutex_); while (!queue_.empty()) { @@ -134,11 +130,9 @@ std::shared_ptr<core::FlowFile> Connection::poll( queued_data_size_ += item->getSize(); break; } - std::shared_ptr<Connectable> connectable = std::static_pointer_cast< - Connectable>(shared_from_this()); + std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this()); item->setOriginalConnection(connectable); - logger_->log_debug("Dequeue flow file UUID %s from connection %s", - item->getUUIDStr().c_str(), name_.c_str()); + logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str()); // delete from the flowfile repo if (flow_repository_->Delete(item->getUUIDStr())) { @@ -155,11 +149,9 @@ std::shared_ptr<core::FlowFile> Connection::poll( queued_data_size_ += item->getSize(); break; } - std::shared_ptr<Connectable> connectable = std::static_pointer_cast< - Connectable>(shared_from_this()); + std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this()); item->setOriginalConnection(connectable); - logger_->log_debug("Dequeue flow file UUID %s from connection %s", - item->getUUIDStr().c_str(), name_.c_str()); + logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str()); // delete from the flowfile repo if (flow_repository_->Delete(item->getUUIDStr())) { item->setStoredToRepository(false); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/EventDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index fa2171b..8a2a874 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -32,22 +32,16 @@ namespace apache { namespace nifi { namespace minifi { -void EventDrivenSchedulingAgent::run( - std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory) { +void EventDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { while (this->running_) { - bool shouldYield = this->onTrigger(processor, processContext, - sessionFactory); + bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { // Honor the yield - std::this_thread::sleep_for( - std::chrono::milliseconds(processor->getYieldTime())); + std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure - std::this_thread::sleep_for( - std::chrono::milliseconds(this->bored_yield_duration_)); + std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_)); } // Block until work is available http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index 69f482f..dbe27e8 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -65,18 +65,18 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) { } if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return 0; + } + } - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) { + logger_->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return 0; + } #endif struct sockaddr_in sa; @@ -108,9 +108,7 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) { return 0; } - logger_->log_info( - "Flow Control Protocol socket %d connect to server %s port %d success", - sock, host, port); + logger_->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port); return sock; } @@ -224,8 +222,7 @@ void FlowControlProtocol::stop() { void FlowControlProtocol::run(FlowControlProtocol *protocol) { while (protocol->running_) { - std::this_thread::sleep_for( - std::chrono::milliseconds(protocol->_reportInterval)); + std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval)); if (!protocol->_registered) { // if it is not register yet protocol->sendRegisterReq(); @@ -251,9 +248,7 @@ int FlowControlProtocol::sendRegisterReq() { return -1; // Calculate the total payload msg size - uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) - + FlowControlMsgIDEncodingLen(FLOW_YML_NAME, - this->_controller->getName().size() + 1); + uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size() + 1); uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; uint8_t *data = new uint8_t[size]; @@ -294,17 +289,13 @@ int FlowControlProtocol::sendRegisterReq() { if (status <= 0) { close(_socket); _socket = 0; - logger_->log_error( - "Flow Control Protocol Read Register Resp header failed"); + logger_->log_error("Flow Control Protocol Read Register Resp header failed"); return -1; } - logger_->log_info("Flow Control Protocol receive MsgType %s", - FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); - logger_->log_info("Flow Control Protocol receive Resp Code %s", - FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - logger_->log_info("Flow Control Protocol receive Payload len %d", - hdr.payloadLen); + logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { this->_registered = true; @@ -327,8 +318,7 @@ int FlowControlProtocol::sendRegisterReq() { // Fixed 4 bytes uint32_t reportInterval; payloadPtr = this->decode(payloadPtr, reportInterval); - logger_->log_info("Flow Control Protocol receive report interval %d ms", - reportInterval); + logger_->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); this->_reportInterval = reportInterval; } else { break; @@ -356,8 +346,7 @@ int FlowControlProtocol::sendReportReq() { return -1; // Calculate the total payload msg size - uint32_t payloadSize = FlowControlMsgIDEncodingLen( - FLOW_YML_NAME, this->_controller->getName().size() + 1); + uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size() + 1); uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; uint8_t *data = new uint8_t[size]; @@ -397,13 +386,10 @@ int FlowControlProtocol::sendReportReq() { logger_->log_error("Flow Control Protocol Read Report Resp header failed"); return -1; } - logger_->log_info("Flow Control Protocol receive MsgType %s", - FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); - logger_->log_info("Flow Control Protocol receive Resp Code %s", - FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - logger_->log_info("Flow Control Protocol receive Payload len %d", - hdr.payloadLen); + logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { this->_seqNumber++; @@ -428,27 +414,20 @@ int FlowControlProtocol::sendReportReq() { payloadPtr = this->decode(payloadPtr, len); processor = (const char *) payloadPtr; payloadPtr += len; - logger_->log_info( - "Flow Control Protocol receive report resp processor %s", - processor.c_str()); + logger_->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str()); } else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) { uint32_t len; payloadPtr = this->decode(payloadPtr, len); propertyName = (const char *) payloadPtr; payloadPtr += len; - logger_->log_info( - "Flow Control Protocol receive report resp property name %s", - propertyName.c_str()); + logger_->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str()); } else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) { uint32_t len; payloadPtr = this->decode(payloadPtr, len); propertyValue = (const char *) payloadPtr; payloadPtr += len; - logger_->log_info( - "Flow Control Protocol receive report resp property value %s", - propertyValue.c_str()); - this->_controller->updatePropertyValue(processor, propertyName, - propertyValue); + logger_->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str()); + this->_controller->updatePropertyValue(processor, propertyName, propertyValue); } else { break; } @@ -457,24 +436,21 @@ int FlowControlProtocol::sendReportReq() { close(_socket); _socket = 0; return 0; - } else if (hdr.status == RESP_TRIGGER_REGISTER - && hdr.seqNumber == this->_seqNumber) { + } else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) { logger_->log_info("Flow Control Protocol trigger reregister"); this->_registered = false; this->_seqNumber++; close(_socket); _socket = 0; return 0; - } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER - && hdr.seqNumber == this->_seqNumber) { + } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) { logger_->log_info("Flow Control Protocol stop flow controller"); this->_controller->stop(true); this->_seqNumber++; close(_socket); _socket = 0; return 0; - } else if (hdr.status == RESP_START_FLOW_CONTROLLER - && hdr.seqNumber == this->_seqNumber) { + } else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) { logger_->log_info("Flow Control Protocol start flow controller"); this->_controller->start(); this->_seqNumber++; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index c7df2e7..62cf21c 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -48,14 +48,9 @@ namespace minifi { #define DEFAULT_CONFIG_NAME "conf/flow.yml" -FlowController::FlowController( - std::shared_ptr<core::Repository> provenance_repo, - std::shared_ptr<core::Repository> flow_file_repo, - std::shared_ptr<Configure> configure, - std::unique_ptr<core::FlowConfiguration> flow_configuration, - const std::string name, bool headless_mode) - : core::controller::ControllerServiceProvider( - core::getClassName<FlowController>()), +FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, + std::unique_ptr<core::FlowConfiguration> flow_configuration, const std::string name, bool headless_mode) + : core::controller::ControllerServiceProvider(core::getClassName<FlowController>()), root_(nullptr), max_timer_driven_threads_(0), max_event_driven_threads_(0), @@ -64,14 +59,13 @@ FlowController::FlowController( provenance_repo_(provenance_repo), flow_file_repo_(flow_file_repo), protocol_(0), - controller_service_map_( - std::make_shared<core::controller::ControllerServiceMap>()), + controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()), timer_scheduler_(nullptr), event_scheduler_(nullptr), controller_service_provider_(nullptr), flow_configuration_(std::move(flow_configuration)), configuration_(configure), - logger_(logging::LoggerFactory<FlowController>::getLogger()) { + logger_(logging::LoggerFactory<FlowController>::getLogger()) { if (provenance_repo == nullptr) throw std::runtime_error("Provenance Repo should not be null"); if (flow_file_repo == nullptr) @@ -96,8 +90,7 @@ FlowController::FlowController( if (!headless_mode) { std::string rawConfigFileString; - configure->get(Configure::nifi_flow_configuration_file, - rawConfigFileString); + configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString); if (!rawConfigFileString.empty()) { configuration_filename_ = rawConfigFileString; @@ -107,8 +100,7 @@ FlowController::FlowController( if (!configuration_filename_.empty()) { // perform a naive determination if this is a relative path if (configuration_filename_.c_str()[0] != '/') { - adjustedFilename = adjustedFilename + configure->getHome() + "/" - + configuration_filename_; + adjustedFilename = adjustedFilename + configure->getHome() + "/" + configuration_filename_; } else { adjustedFilename = configuration_filename_; } @@ -124,19 +116,16 @@ void FlowController::initializePaths(const std::string &adjustedFilename) { path = realpath(adjustedFilename.c_str(), full_path); if (path == NULL) { - throw std::runtime_error( - "Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists"); + throw std::runtime_error("Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists"); } std::string pathString(path); configuration_filename_ = pathString; - logger_->log_info("FlowController NiFi Configuration file %s", - pathString.c_str()); + logger_->log_info("FlowController NiFi Configuration file %s", pathString.c_str()); // Create the content repo directory if needed struct stat contentDirStat; - if (stat(ResourceClaim::default_directory_path, &contentDirStat) - != -1&& S_ISDIR(contentDirStat.st_mode)) { + if (stat(ResourceClaim::default_directory_path, &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) { path = realpath(ResourceClaim::default_directory_path, full_path); logger_->log_info("FlowController content directory %s", full_path); } else { @@ -149,9 +138,7 @@ void FlowController::initializePaths(const std::string &adjustedFilename) { std::string clientAuthStr; if (!path) { - logger_->log_error( - "Could not locate path from provided configuration file name (%s). Exiting.", - full_path); + logger_->log_error("Could not locate path from provided configuration file name (%s). Exiting.", full_path); exit(1); } } @@ -179,8 +166,7 @@ void FlowController::stop(bool force) { // Wait for sometime for thread stop std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (this->root_) - this->root_->stopProcessing(this->timer_scheduler_.get(), - this->event_scheduler_.get()); + this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get()); } } @@ -196,13 +182,10 @@ void FlowController::stop(bool force) { void FlowController::waitUnload(const uint64_t timeToWaitMs) { if (running_) { // use the current time and increment with the provided argument. - std::chrono::system_clock::time_point wait_time = - std::chrono::system_clock::now() - + std::chrono::milliseconds(timeToWaitMs); + std::chrono::system_clock::time_point wait_time = std::chrono::system_clock::now() + std::chrono::milliseconds(timeToWaitMs); // create an asynchronous future. - std::future<void> unload_task = std::async(std::launch::async, - [this]() {unload();}); + std::future<void> unload_task = std::async(std::launch::async, [this]() {unload();}); if (std::future_status::ready == unload_task.wait_until(wait_time)) { running_ = false; @@ -233,32 +216,21 @@ void FlowController::load() { if (!initialized_) { logger_->log_info("Initializing timers"); if (nullptr == timer_scheduler_) { - timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>( - std::static_pointer_cast<core::controller::ControllerServiceProvider>( - shared_from_this()), - provenance_repo_, configuration_); + timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_); } if (nullptr == event_scheduler_) { - event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>( - std::static_pointer_cast<core::controller::ControllerServiceProvider>( - shared_from_this()), - provenance_repo_, configuration_); + event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_); } - logger_->log_info("Load Flow Controller from file %s", - configuration_filename_.c_str()); + logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str()); - this->root_ = std::shared_ptr<core::ProcessGroup>( - flow_configuration_->getRoot(configuration_filename_)); + this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_)); logger_->log_info("Loaded root processor Group"); - controller_service_provider_ = flow_configuration_ - ->getControllerServiceProvider(); + controller_service_provider_ = flow_configuration_->getControllerServiceProvider(); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>( - controller_service_provider_)->setRootGroup(root_); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>( - controller_service_provider_)->setSchedulingAgent( + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_); + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent( std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_)); logger_->log_info("Loaded controller service provider"); @@ -271,8 +243,7 @@ void FlowController::load() { void FlowController::reload(std::string yamlFile) { std::lock_guard<std::recursive_mutex> flow_lock(mutex_); - logger_->log_info("Starting to reload Flow Controller with yaml %s", - yamlFile.c_str()); + logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str()); stop(true); unload(); std::string oldYamlFile = this->configuration_filename_; @@ -281,8 +252,7 @@ void FlowController::reload(std::string yamlFile) { start(); if (this->root_ != nullptr) { this->configuration_filename_ = oldYamlFile; - logger_->log_info("Rollback Flow Controller to YAML %s", - oldYamlFile.c_str()); + logger_->log_info("Rollback Flow Controller to YAML %s", oldYamlFile.c_str()); stop(true); unload(); load(); @@ -297,10 +267,8 @@ void FlowController::loadFlowRepo() { if (this->root_ != nullptr) { this->root_->getConnections(connectionMap); } - logger_->log_debug("Number of connections from connectionMap %d", - connectionMap.size()); - auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>( - flow_file_repo_); + logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size()); + auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_); if (nullptr != rep) { rep->setConnectionMap(connectionMap); } @@ -313,8 +281,7 @@ void FlowController::loadFlowRepo() { bool FlowController::start() { std::lock_guard<std::recursive_mutex> flow_lock(mutex_); if (!initialized_) { - logger_->log_error( - "Can not start Flow Controller because it has not been initialized"); + logger_->log_error("Can not start Flow Controller because it has not been initialized"); return false; } else { if (!running_) { @@ -323,8 +290,7 @@ bool FlowController::start() { this->timer_scheduler_->start(); this->event_scheduler_->start(); if (this->root_ != nullptr) { - this->root_->startProcessing(this->timer_scheduler_.get(), - this->event_scheduler_.get()); + this->root_->startProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get()); } running_ = true; this->protocol_->start(); @@ -346,11 +312,9 @@ bool FlowController::start() { * @param id service identifier * @param firstTimeAdded first time this CS was added */ -std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService( - const std::string &type, const std::string &id, - bool firstTimeAdded) { - return controller_service_provider_->createControllerService(type, id, - firstTimeAdded); +std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(const std::string &type, const std::string &id, +bool firstTimeAdded) { + return controller_service_provider_->createControllerService(type, id, firstTimeAdded); } /** @@ -361,8 +325,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createC * @param serviceNode service node to be removed. */ -void FlowController::removeControllerService( - const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +void FlowController::removeControllerService(const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { controller_map_->removeControllerService(serviceNode); } @@ -370,8 +333,7 @@ void FlowController::removeControllerService( * Enables the controller service services * @param serviceNode service node which will be disabled, along with linked services. */ -void FlowController::enableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +void FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { return controller_service_provider_->enableControllerService(serviceNode); } @@ -379,16 +341,14 @@ void FlowController::enableControllerService( * Enables controller services * @param serviceNoden vector of service nodes which will be enabled, along with linked services. */ -void FlowController::enableControllerServices( - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) { +void FlowController::enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) { } /** * Disables controller services * @param serviceNode service node which will be disabled, along with linked services. */ -void FlowController::disableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +void FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { controller_service_provider_->disableControllerService(serviceNode); } @@ -404,40 +364,33 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro * @param id service identifier * @return shared pointer to the controller service node or nullptr if it does not exist. */ -std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode( - const std::string &id) { +std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode(const std::string &id) { return controller_service_provider_->getControllerServiceNode(id); } -void FlowController::verifyCanStopReferencingComponents( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +void FlowController::verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { } /** * Unschedules referencing components. */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::unscheduleReferencingComponents( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->unscheduleReferencingComponents( - serviceNode); +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->unscheduleReferencingComponents(serviceNode); } /** * Verify can disable referencing components * @param serviceNode service node whose referenced components will be scheduled. */ -void FlowController::verifyCanDisableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - controller_service_provider_->verifyCanDisableReferencingServices( - serviceNode); +void FlowController::verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + controller_service_provider_->verifyCanDisableReferencingServices(serviceNode); } /** * Disables referencing components * @param serviceNode service node whose referenced components will be scheduled. */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::disableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { return controller_service_provider_->disableReferencingServices(serviceNode); } @@ -445,8 +398,7 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro * Verify can enable referencing components * @param serviceNode service node whose referenced components will be scheduled. */ -void FlowController::verifyCanEnableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +void FlowController::verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { controller_service_provider_->verifyCanEnableReferencingServices(serviceNode); } @@ -461,8 +413,7 @@ bool FlowController::isControllerServiceEnabled(const std::string &identifier) { * Enables referencing components * @param serviceNode service node whose referenced components will be scheduled. */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::enableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { return controller_service_provider_->enableReferencingServices(serviceNode); } @@ -470,20 +421,16 @@ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowContro * Schedules referencing components * @param serviceNode service node whose referenced components will be scheduled. */ -std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::scheduleReferencingComponents( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - return controller_service_provider_->scheduleReferencingComponents( - serviceNode); +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->scheduleReferencingComponents(serviceNode); } /** * Returns controller service components referenced by serviceIdentifier from the embedded * controller service provider; */ -std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent( - const std::string &serviceIdentifier, const std::string &componentId) { - return controller_service_provider_->getControllerServiceForComponent( - serviceIdentifier, componentId); +std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent(const std::string &serviceIdentifier, const std::string &componentId) { + return controller_service_provider_->getControllerServiceForComponent(serviceIdentifier, componentId); } /**
