MINIFI-254: Incremental update for linter changes Update YAML to run linter at build time. Tests will run before linter so that we can give contributors a chance to see tests run in the event there are linter failures.
This closes #73. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/be3f2ffe Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/be3f2ffe Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/be3f2ffe Branch: refs/heads/master Commit: be3f2ffea3173b46d02a7d484cd901bf36d51e63 Parents: 3676468 Author: Marc Parisi <[email protected]> Authored: Mon Apr 3 16:27:24 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Wed Apr 5 17:48:30 2017 +0200 ---------------------------------------------------------------------- .travis.yml | 2 +- libminifi/include/Connection.h | 2 +- libminifi/include/Exception.h | 93 ++-- libminifi/include/FlowControlProtocol.h | 11 +- libminifi/include/FlowController.h | 5 +- libminifi/include/RemoteProcessorGroupPort.h | 21 +- libminifi/include/ResourceClaim.h | 109 ++--- libminifi/include/SchedulingAgent.h | 20 +- libminifi/include/Site2SiteClientProtocol.h | 11 +- libminifi/include/Site2SitePeer.h | 2 - libminifi/include/core/ConfigurationFactory.h | 24 +- libminifi/include/core/Connectable.h | 2 +- libminifi/include/core/Core.h | 178 +++++++ libminifi/include/core/FlowConfiguration.h | 4 +- libminifi/include/core/FlowFile.h | 9 +- libminifi/include/core/ProcessSession.h | 17 +- libminifi/include/core/Processor.h | 9 +- libminifi/include/core/ProcessorConfig.h | 6 +- libminifi/include/core/Repository.h | 19 +- libminifi/include/core/RepositoryFactory.h | 9 +- libminifi/include/core/core.h | 180 ------- .../core/repository/FlowFileRepository.h | 116 +++-- libminifi/include/io/BaseStream.h | 1 - libminifi/include/io/ClientSocket.h | 14 +- libminifi/include/io/StreamFactory.h | 157 +++--- libminifi/include/io/tls/TLSSocket.h | 17 +- libminifi/include/io/validation.h | 10 +- libminifi/include/processors/AppendHostInfo.h | 9 +- libminifi/include/processors/ExecuteProcess.h | 9 +- libminifi/include/processors/GenerateFlowFile.h | 9 +- libminifi/include/processors/GetFile.h | 55 ++- libminifi/include/processors/ListenHTTP.h | 20 +- libminifi/include/processors/ListenSyslog.h | 15 +- libminifi/include/processors/LogAttribute.h | 9 +- libminifi/include/processors/PutFile.h | 17 +- .../include/processors/RealTimeDataCollector.h | 145 ------ libminifi/include/processors/TailFile.h | 6 +- libminifi/include/properties/Configure.h | 2 +- libminifi/include/provenance/Provenance.h | 1 - .../include/provenance/ProvenanceRepository.h | 4 +- libminifi/include/utils/FailurePolicy.h | 35 +- libminifi/include/utils/StringUtils.h | 182 +++---- libminifi/include/utils/TimeUtil.h | 32 +- libminifi/src/Configure.cpp | 253 +++++----- libminifi/src/Connection.cpp | 10 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 4 +- libminifi/src/FlowControlProtocol.cpp | 46 +- libminifi/src/FlowController.cpp | 31 +- libminifi/src/FlowFileRecord.cpp | 54 +-- libminifi/src/RemoteProcessorGroupPort.cpp | 20 +- libminifi/src/ResourceClaim.cpp | 3 +- libminifi/src/SchedulingAgent.cpp | 16 +- libminifi/src/Site2SiteClientProtocol.cpp | 57 ++- libminifi/src/Site2SitePeer.cpp | 31 +- libminifi/src/ThreadedSchedulingAgent.cpp | 42 +- libminifi/src/TimerDrivenSchedulingAgent.cpp | 4 +- libminifi/src/core/ConfigurableComponent.cpp | 21 +- libminifi/src/core/ConfigurationFactory.cpp | 72 +-- libminifi/src/core/Connectable.cpp | 35 +- libminifi/src/core/Core.cpp | 34 +- libminifi/src/core/FlowConfiguration.cpp | 8 +- libminifi/src/core/FlowFile.cpp | 223 +++++++++ libminifi/src/core/ProcessGroup.cpp | 26 +- libminifi/src/core/ProcessSession.cpp | 74 ++- libminifi/src/core/ProcessSessionFactory.cpp | 10 +- libminifi/src/core/Processor.cpp | 36 +- libminifi/src/core/ProcessorNode.cpp | 24 +- libminifi/src/core/Property.cpp | 5 +- libminifi/src/core/Record.cpp | 225 --------- libminifi/src/core/Repository.cpp | 4 +- libminifi/src/core/RepositoryFactory.cpp | 98 ++-- libminifi/src/core/logging/BaseLogger.cpp | 7 +- libminifi/src/core/logging/LogAppenders.cpp | 13 +- libminifi/src/core/logging/Logger.cpp | 1 + .../src/core/repository/FlowFileRepository.cpp | 76 +-- libminifi/src/core/yaml/YamlConfiguration.cpp | 20 +- libminifi/src/io/BaseStream.cpp | 36 +- libminifi/src/io/CRCStream.cpp | 2 - libminifi/src/io/ClientSocket.cpp | 82 +--- libminifi/src/io/DataStream.cpp | 150 +++--- libminifi/src/io/EndianCheck.cpp | 1 - libminifi/src/io/Serializable.cpp | 24 +- libminifi/src/io/StreamFactory.cpp | 8 +- libminifi/src/io/tls/TLSSocket.cpp | 359 +++++++------- libminifi/src/processors/AppendHostInfo.cpp | 41 +- libminifi/src/processors/ExecuteProcess.cpp | 34 +- libminifi/src/processors/GenerateFlowFile.cpp | 192 ++++---- libminifi/src/processors/GetFile.cpp | 41 +- libminifi/src/processors/ListenHTTP.cpp | 76 ++- libminifi/src/processors/ListenSyslog.cpp | 55 +-- libminifi/src/processors/LogAttribute.cpp | 31 +- libminifi/src/processors/PutFile.cpp | 20 +- .../src/processors/RealTimeDataCollector.cpp | 480 ------------------- libminifi/src/processors/TailFile.cpp | 58 +-- libminifi/src/provenance/Provenance.cpp | 38 +- .../src/provenance/ProvenanceRepository.cpp | 26 +- libminifi/test/TestBase.h | 4 +- libminifi/test/nodefs/NoLevelDB.cpp | 2 +- libminifi/test/nodefs/NoYamlConfiguration.cpp | 2 +- libminifi/test/unit/ProcessorTests.cpp | 14 +- libminifi/test/unit/ProvenanceTestHelper.h | 2 +- libminifi/test/unit/ProvenanceTests.cpp | 2 +- libminifi/test/unit/RepoTests.cpp | 2 +- main/MiNiFiMain.cpp | 2 +- thirdparty/google-styleguide/cpplint.py | 6 - thirdparty/google-styleguide/run_linter.sh | 7 +- 106 files changed, 2084 insertions(+), 2894 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index faa291b..cc9c8e2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,4 +47,4 @@ matrix: - package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package} script: - - mkdir ./build && cd ./build && cmake .. && make && make test + - mkdir ./build && cd ./build && cmake .. && make && make test && make linter http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/Connection.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index 1c7b9a4..8fe42d0 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -27,7 +27,7 @@ #include <mutex> #include <atomic> #include <algorithm> -#include "core/core.h" +#include "core/Core.h" #include "core/Connectable.h" #include "core/logging/Logger.h" #include "core/Relationship.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/Exception.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h index a0c70e6..88a3ed2 100644 --- a/libminifi/include/Exception.h +++ b/libminifi/include/Exception.h @@ -26,74 +26,65 @@ #include <errno.h> #include <string.h> - namespace org { namespace apache { namespace nifi { namespace minifi { // ExceptionType -enum ExceptionType -{ - FILE_OPERATION_EXCEPTION = 0, - FLOW_EXCEPTION, - PROCESSOR_EXCEPTION, - PROCESS_SESSION_EXCEPTION, - PROCESS_SCHEDULE_EXCEPTION, - SITE2SITE_EXCEPTION, - GENERAL_EXCEPTION, - MAX_EXCEPTION +enum ExceptionType { + FILE_OPERATION_EXCEPTION = 0, + FLOW_EXCEPTION, + PROCESSOR_EXCEPTION, + PROCESS_SESSION_EXCEPTION, + PROCESS_SCHEDULE_EXCEPTION, + SITE2SITE_EXCEPTION, + GENERAL_EXCEPTION, + MAX_EXCEPTION }; // Exception String -static const char *ExceptionStr[MAX_EXCEPTION] = -{ - "File Operation", - "Flow File Operation", - "Processor Operation", - "Process Session Operation", - "Process Schedule Operation", - "Site2Site Protocol", - "General Operation" -}; +static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", + "Flow File Operation", "Processor Operation", "Process Session Operation", + "Process Schedule Operation", "Site2Site Protocol", "General Operation" }; // Exception Type to String -inline const char *ExceptionTypeToString(ExceptionType type) -{ - if (type < MAX_EXCEPTION) - return ExceptionStr[type]; - else - return NULL; +inline const char *ExceptionTypeToString(ExceptionType type) { + if (type < MAX_EXCEPTION) + return ExceptionStr[type]; + else + return NULL; } // Exception Class -class Exception : public std::exception -{ -public: - // Constructor - /*! - * Create a new flow record - */ - Exception(ExceptionType type, const char *errorMsg) : _type(type), _errorMsg(errorMsg) { - } - // Destructor - virtual ~Exception() throw () {} - virtual const char * what() const throw () { - - _whatStr = ExceptionTypeToString(_type); +class Exception : public std::exception { + public: + // Constructor + /*! + * Create a new flow record + */ + Exception(ExceptionType type, const char *errorMsg) + : _type(type), + _errorMsg(errorMsg) { + } + // Destructor + virtual ~Exception() throw () { + } + virtual const char * what() const throw () { - _whatStr += ":" + _errorMsg; - return _whatStr.c_str(); - } + _whatStr = ExceptionTypeToString(_type); + _whatStr += ":" + _errorMsg; + return _whatStr.c_str(); + } -private: - // Exception type - ExceptionType _type; - // Exception detailed information - std::string _errorMsg; - // Hold the what result - mutable std::string _whatStr; + private: + // Exception type + ExceptionType _type; + // Exception detailed information + std::string _errorMsg; + // Hold the what result + mutable std::string _whatStr; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index c37c8f8..826f999 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -37,7 +37,6 @@ #include "properties/Configure.h" #include "core/logging/Logger.h" - namespace org { namespace apache { namespace nifi { @@ -179,16 +178,14 @@ class FlowControlProtocol { logger_->log_info("NiFi Server Name %s", _serverName.c_str()); } if (configure_->get(Configure::nifi_server_port, value) - && core::Property::StringToInt( - value, _serverPort)) { + && core::Property::StringToInt(value, _serverPort)) { logger_->log_info("NiFi Server Port: [%d]", _serverPort); } if (configure_->get(Configure::nifi_server_report_interval, value)) { core::TimeUnit unit; - if (core::Property::StringToTime( - value, _reportInterval, unit) - && core::Property::ConvertTimeUnitToMS( - _reportInterval, unit, _reportInterval)) { + if (core::Property::StringToTime(value, _reportInterval, unit) + && core::Property::ConvertTimeUnitToMS(_reportInterval, unit, + _reportInterval)) { logger_->log_info("NiFi server report interval: [%d] ms", _reportInterval); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 0475623..e007f80 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -67,7 +67,8 @@ class FlowController : public core::CoreComponent { FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::unique_ptr<core::FlowConfiguration> flow_configuration, - const std::string name = DEFAULT_ROOT_GROUP_NAME,bool headless_mode=false); + const std::string name = DEFAULT_ROOT_GROUP_NAME, + bool headless_mode = false); // Destructor virtual ~FlowController(); @@ -122,7 +123,7 @@ class FlowController : public core::CoreComponent { // update property value void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { - if (root_ != nullptr) + if (root_ != nullptr) root_->updatePropertyValue(processorName, propertyName, propertyValue); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index e9a4228..8667519 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -28,14 +28,12 @@ #include "core/ProcessSession.h" #include "Site2SiteClientProtocol.h" - namespace org { namespace apache { namespace nifi { namespace minifi { // RemoteProcessorGroupPort Class -class RemoteProcessorGroupPort : - public core::Processor { +class RemoteProcessorGroupPort : public core::Processor { public: // Constructor /*! @@ -44,9 +42,9 @@ class RemoteProcessorGroupPort : RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL) : core::Processor(name, uuid), direction_(SEND), - transmitting_(false){ + transmitting_(false) { logger_ = logging::Logger::getLogger(); - uuid_copy(protocol_uuid_,uuid); + uuid_copy(protocol_uuid_, uuid); } // Destructor virtual ~RemoteProcessorGroupPort() { @@ -61,9 +59,8 @@ class RemoteProcessorGroupPort : static core::Relationship relation; public: // OnTrigger method, implemented by NiFi RemoteProcessorGroupPort - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi RemoteProcessorGroupPort virtual void initialize(void); // Set Direction @@ -84,10 +81,10 @@ class RemoteProcessorGroupPort : protected: private: - + std::unique_ptr<Site2SiteClientProtocol> getNextProtocol(); void returnProtocol(std::unique_ptr<Site2SiteClientProtocol> protocol); - + std::stack<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_; std::mutex protocol_mutex_; // Logger @@ -98,9 +95,9 @@ class RemoteProcessorGroupPort : bool transmitting_; // timeout uint64_t timeout_; - + uuid_t protocol_uuid_; - + }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/ResourceClaim.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h index 4c5438c..1f5d17f 100644 --- a/libminifi/include/ResourceClaim.h +++ b/libminifi/include/ResourceClaim.h @@ -34,74 +34,67 @@ namespace apache { namespace nifi { namespace minifi { - // Default content directory #define DEFAULT_CONTENT_DIRECTORY "./content_repository" - - // ResourceClaim Class class ResourceClaim { -public: - - static std::string default_directory_path; - // Constructor - /*! - * Create a new resource claim - */ - ResourceClaim(const std::string contentDirectory = default_directory_path); - // Destructor - virtual ~ResourceClaim() {} - // increaseFlowFileRecordOwnedCount - void increaseFlowFileRecordOwnedCount() - { - ++_flowFileRecordOwnedCount; - } - // decreaseFlowFileRecordOwenedCount - void decreaseFlowFileRecordOwnedCount() - { - --_flowFileRecordOwnedCount; - } - // getFlowFileRecordOwenedCount - uint64_t getFlowFileRecordOwnedCount() - { - return _flowFileRecordOwnedCount; - } - // Get the content full path - std::string getContentFullPath() - { - return _contentFullPath; - } - // Set the content full path - void setContentFullPath(std::string path) - { - _contentFullPath = path; - } + public: + + static char *default_directory_path; + // Constructor + /*! + * Create a new resource claim + */ + ResourceClaim(const std::string contentDirectory = default_directory_path); + // Destructor + virtual ~ResourceClaim() { + } + // increaseFlowFileRecordOwnedCount + void increaseFlowFileRecordOwnedCount() { + ++_flowFileRecordOwnedCount; + } + // decreaseFlowFileRecordOwenedCount + void decreaseFlowFileRecordOwnedCount() { + --_flowFileRecordOwnedCount; + } + // getFlowFileRecordOwenedCount + uint64_t getFlowFileRecordOwnedCount() { + return _flowFileRecordOwnedCount; + } + // Get the content full path + std::string getContentFullPath() { + return _contentFullPath; + } + // Set the content full path + void setContentFullPath(std::string path) { + _contentFullPath = path; + } -protected: - // A global unique identifier - uuid_t _uuid; - // A local unique identifier - uint64_t _id; - // Full path to the content - std::string _contentFullPath; + protected: + // A global unique identifier + uuid_t _uuid; + // A local unique identifier + uint64_t _id; + // Full path to the content + std::string _contentFullPath; - // How many FlowFileRecord Own this cliam - std::atomic<uint64_t> _flowFileRecordOwnedCount; + // How many FlowFileRecord Own this cliam + std::atomic<uint64_t> _flowFileRecordOwnedCount; -private: - // Configure - Configure *configure_; - // Logger - std::shared_ptr<logging::Logger> logger_; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ResourceClaim(const ResourceClaim &parent); - ResourceClaim &operator=(const ResourceClaim &parent); + private: + // Configure + Configure *configure_; + // Logger + std::shared_ptr<logging::Logger> logger_; + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ResourceClaim(const ResourceClaim &parent); + ResourceClaim &operator=(const ResourceClaim &parent); - // Local resource claim number - static std::atomic<uint64_t> _localResourceClaimNumber; + // Local resource claim number + static std::atomic<uint64_t> _localResourceClaimNumber; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 0493640..e7d1e58 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -28,7 +28,7 @@ #include <algorithm> #include <thread> #include "utils/TimeUtil.h" -#include "core/core.h" +#include "core/Core.h" #include "core/logging/Logger.h" #include "properties/Configure.h" #include "FlowFileRecord.h" @@ -37,13 +37,11 @@ #include "core/ProcessContext.h" #include "provenance/ProvenanceRepository.h" - namespace org { namespace apache { namespace nifi { namespace minifi { - // SchedulingAgent Class class SchedulingAgent { public: @@ -62,15 +60,13 @@ class SchedulingAgent { } // onTrigger, return whether the yield is need - bool onTrigger( - std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory); + bool onTrigger(std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory); // Whether agent has work to do bool hasWorkToDo(std::shared_ptr<core::Processor> processor); // Whether the outgoing need to be backpressure - bool hasTooMuchOutGoing( - std::shared_ptr<core::Processor> processor); + bool hasTooMuchOutGoing(std::shared_ptr<core::Processor> processor); // start void start() { running_ = true; @@ -82,11 +78,9 @@ class SchedulingAgent { public: // schedule, overwritten by different DrivenSchedulingAgent - virtual void schedule( - std::shared_ptr<core::Processor> processor) = 0; + virtual void schedule(std::shared_ptr<core::Processor> processor) = 0; // unschedule, overwritten by different DrivenSchedulingAgent - virtual void unschedule( - std::shared_ptr<core::Processor> processor) = 0; + virtual void unschedule(std::shared_ptr<core::Processor> processor) = 0; SchedulingAgent(const SchedulingAgent &parent) = delete; SchedulingAgent &operator=(const SchedulingAgent &parent) = delete; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index 6120e3e..78673d8 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -44,7 +44,6 @@ #include "core/ProcessSession.h" #include "io/CRCStream.h" - namespace org { namespace apache { namespace nifi { @@ -530,13 +529,11 @@ class Site2SiteClientProtocol { // Error the transaction void error(std::string transactionID); // Receive flow files for the process session - void receiveFlowFiles( - core::ProcessContext *context, - core::ProcessSession *session); + void receiveFlowFiles(core::ProcessContext *context, + core::ProcessSession *session); // Transfer flow files for the process session - void transferFlowFiles( - core::ProcessContext *context, - core::ProcessSession *session); + void transferFlowFiles(core::ProcessContext *context, + core::ProcessSession *session); // deleteTransaction void deleteTransaction(std::string transactionID); // Nest Callback Class for write stream http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/Site2SitePeer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h index de3a42f..a315293 100644 --- a/libminifi/include/Site2SitePeer.h +++ b/libminifi/include/Site2SitePeer.h @@ -37,7 +37,6 @@ #include "io/BaseStream.h" #include "utils/TimeUtil.h" - namespace org { namespace apache { namespace nifi { @@ -281,7 +280,6 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { }; - } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/ConfigurationFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h index 19ed5f4..b25faff 100644 --- a/libminifi/include/core/ConfigurationFactory.h +++ b/libminifi/include/core/ConfigurationFactory.h @@ -28,33 +28,29 @@ namespace nifi { namespace minifi { namespace core { - - - template<typename T> typename std::enable_if<!class_operations<T>::value, T*>::type instantiate( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) { + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, const std::string path) { throw std::runtime_error("Cannot instantiate class"); } template<typename T> typename std::enable_if<class_operations<T>::value, T*>::type instantiate( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo,const std::string path ) { - return new T(repo,flow_file_repo,path); + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, const std::string path) { + return new T(repo, flow_file_repo, path); } - /** * Configuration factory is used to create a new FlowConfiguration * object. */ - std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, - const std::string configuration_class_name, const std::string path = "", - bool fail_safe = false); +std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, + const std::string configuration_class_name, const std::string path = "", + bool fail_safe = false); } /* namespace core */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/Connectable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h index 15e618f..f7e425e 100644 --- a/libminifi/include/core/Connectable.h +++ b/libminifi/include/core/Connectable.h @@ -20,7 +20,7 @@ #define LIBMINIFI_INCLUDE_CORE_CONNECTABLE_H_ #include <set> -#include "core.h" +#include "Core.h" #include <condition_variable> #include "core/logging/Logger.h" #include "Relationship.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h new file mode 100644 index 0000000..1010be7 --- /dev/null +++ b/libminifi/include/core/Core.h @@ -0,0 +1,178 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CORE_H_ +#define LIBMINIFI_INCLUDE_CORE_CORE_H_ + +#include <uuid/uuid.h> +#include <cxxabi.h> +#include "core/logging/Logger.h" +/** + * namespace aliasing + */ +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { +} +namespace processors { +} +namespace provenance { + +} +namespace core { + +template<typename T> +static inline std::string getClassName() { + char *b = abi::__cxa_demangle(typeid(T).name(), 0, 0, 0); + std::string name = b; + delete[] b; + return name; +} + +template<typename T> +struct class_operations { + + template<typename Q = T> + static std::true_type canDestruct(decltype(std::declval<Q>().~Q()) *) { + return std::true_type(); + } + + template<typename Q = T> + static std::false_type canDestruct(...) { + return std::false_type(); + } + + typedef decltype(canDestruct<T>(0)) type; + + static const bool value = type::value; /* Which is it? */ +}; + +template<typename T> +typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { + throw std::runtime_error("Cannot instantiate class"); +} + +template<typename T> +typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { + return std::make_shared<T>(); +} + +/** + * Base component within MiNiFi + * Purpose: Many objects store a name and UUID, therefore + * the functionality is localized here to avoid duplication + */ +class CoreComponent { + + public: + + /** + * Constructor that sets the name and uuid. + */ + explicit CoreComponent(const std::string name, uuid_t uuid = 0) + : logger_(logging::Logger::getLogger()), + name_(name) { + if (!uuid) + // Generate the global UUID for the flow record + uuid_generate(uuid_); + else + uuid_copy(uuid_, uuid); + + char uuidStr[37]; + uuid_unparse_lower(uuid_, uuidStr); + uuidStr_ = uuidStr; + } + + /** + * Move Constructor. + */ + explicit CoreComponent(const CoreComponent &&other) + : name_(std::move(other.name_)), + logger_(logging::Logger::getLogger()) { + uuid_copy(uuid_, other.uuid_); + } + + // Get component name Name + std::string getName(); + + /** + * Set name. + * @param name + */ + void setName(const std::string name); + + /** + * Set UUID in this instance + * @param uuid uuid to apply to the internal representation. + */ + void setUUID(uuid_t uuid); + + /** + * Returns the UUID through the provided object. + * @param uuid uuid struct to which we will copy the memory + * @return success of request + */ + bool getUUID(uuid_t uuid); + + unsigned const char *getUUID(); + /** + * Return the UUID string + * @param constant reference to the UUID str + */ + const std::string & getUUIDStr() { + return uuidStr_; + } + + void loadComponent(){ + } + + protected: + // A global unique identifier + uuid_t uuid_; + // UUID string + std::string uuidStr_; + + // logger shared ptr + std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_; + + // Connectable's name + std::string name_; +}; + +namespace logging { +} +} +} +} +} +} + +namespace minifi = org::apache::nifi::minifi; + +namespace core = org::apache::nifi::minifi::core; + +namespace processors = org::apache::nifi::minifi::processors; + +namespace logging = org::apache::nifi::minifi::core::logging; + +namespace utils = org::apache::nifi::minifi::utils; + +namespace provenance = org::apache::nifi::minifi::provenance; + +#endif /* LIBMINIFI_INCLUDE_CORE_CORE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index e95e684..de8ceb4 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -18,7 +18,7 @@ #ifndef LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ #define LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ -#include "core/core.h" +#include "core/Core.h" #include "Connection.h" #include "RemoteProcessorGroupPort.h" #include "provenance/Provenance.h" @@ -28,7 +28,7 @@ #include "processors/ListenHTTP.h" #include "processors/ListenSyslog.h" #include "processors/GenerateFlowFile.h" -#include "processors/RealTimeDataCollector.h" +#include "processors/ListenHTTP.h" #include "processors/LogAttribute.h" #include "processors/ExecuteProcess.h" #include "processors/AppendHostInfo.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/FlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h index 247ad26..394b9d4 100644 --- a/libminifi/include/core/FlowFile.h +++ b/libminifi/include/core/FlowFile.h @@ -183,10 +183,9 @@ class FlowFile { std::string getUUIDStr() { return uuid_str_; } - - bool getUUID(uuid_t other) - { - uuid_copy(other,uuid_); + + bool getUUID(uuid_t other) { + uuid_copy(other, uuid_); return true; } @@ -266,7 +265,7 @@ class FlowFile { // Logger std::shared_ptr<logging::Logger> logger_; - + // Connection queue that this flow file will be transfer or current in std::shared_ptr<core::Connectable> connection_; // Orginal connection queue that this flow file was dequeued from http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index a80769e..70805e9 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -71,17 +71,17 @@ class ProcessSession { provenance::ProvenanceReporter *getProvenanceReporter() { return provenance_report_; } -// -// Get the FlowFile from the highest priority queue + // + // Get the FlowFile from the highest priority queue std::shared_ptr<core::FlowFile> get(); -// Create a new UUID FlowFile with no content resource claim and without parent + // Create a new UUID FlowFile with no content resource claim and without parent std::shared_ptr<core::FlowFile> create(); -// Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent + // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent std::shared_ptr<core::FlowFile> create( std::shared_ptr<core::FlowFile> &&parent); - + // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent std::shared_ptr<core::FlowFile> create( - std::shared_ptr<core::FlowFile> &parent){ + std::shared_ptr<core::FlowFile> &parent) { return create(parent); } // Clone a new UUID FlowFile from parent both for content resource claim and attributes @@ -89,7 +89,7 @@ class ProcessSession { std::shared_ptr<core::FlowFile> &parent); // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent, - long offset, long size); + int64_t offset, int64_t size); // Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session std::shared_ptr<core::FlowFile> duplicate( std::shared_ptr<core::FlowFile> &original); @@ -133,7 +133,8 @@ class ProcessSession { * @param stream incoming data stream that contains the data to store into a file * @param flow flow file */ - void importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow); + void importFrom(io::DataStream &stream, + std::shared_ptr<core::FlowFile> &&flow); // import from the data source. void import(std::string source, std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 4a71816..2b540ec 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -123,8 +123,7 @@ class Processor : public Connectable, public ConfigurableComponent, void setPenalizationPeriodMsec(uint64_t period) { _penalizationPeriodMsec = period; } - - + // Set Processor Maximum Concurrent Tasks void setMaxConcurrentTasks(uint8_t tasks) { max_concurrent_tasks_ = tasks; @@ -211,7 +210,6 @@ class Processor : public Connectable, public ConfigurableComponent, public: - // OnTrigger method, implemented by NiFi Processor Designer virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0; // Initialize, overridden by NiFi Process Designer @@ -235,19 +233,18 @@ class Processor : public Connectable, public ConfigurableComponent, std::atomic<uint64_t> run_durantion_nano_; // Yield Period in Milliseconds std::atomic<uint64_t> yield_period_msec_; - + // Active Tasks std::atomic<uint8_t> active_tasks_; // Trigger the Processor even if the incoming connection is empty std::atomic<bool> _triggerWhenEmpty; -private: + private: // Mutex for protection std::mutex mutex_; // Yield Expiration std::atomic<uint64_t> yield_expiration_; - // Check all incoming connections for work bool isWorkAvailable(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/ProcessorConfig.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h index 6b4a00a..c1d563e 100644 --- a/libminifi/include/core/ProcessorConfig.h +++ b/libminifi/include/core/ProcessorConfig.h @@ -17,7 +17,7 @@ #ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ #define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ -#include "core/core.h" +#include "core/Core.h" #include "core/Property.h" namespace org { @@ -26,7 +26,6 @@ namespace nifi { namespace minifi { namespace core { - struct ProcessorConfig { std::string id; std::string name; @@ -41,13 +40,10 @@ struct ProcessorConfig { std::vector<core::Property> properties; }; - - } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ } /* namespace org */ - #endif /* LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index e096023..6209b83 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -39,7 +39,7 @@ #include "io/Serializable.h" #include "utils/TimeUtil.h" #include "utils/StringUtils.h" -#include "core.h" +#include "Core.h" namespace org { namespace apache { @@ -72,15 +72,15 @@ class Repository : public CoreComponent { } // initialize - virtual bool initialize(){ + virtual bool initialize() { return true; } // Put - virtual bool Put(std::string key, uint8_t *buf, int bufLen){ + virtual bool Put(std::string key, uint8_t *buf, int bufLen) { return true; } // Delete - virtual bool Delete(std::string key){ + virtual bool Delete(std::string key) { return true; } @@ -89,7 +89,7 @@ class Repository : public CoreComponent { } // Run function for the thread - virtual void run(){ + virtual void run() { // no op } // Start the repository monitor thread @@ -138,12 +138,9 @@ class Repository : public CoreComponent { // size of the directory std::atomic<uint64_t> repo_size_; // Run function for the thread - void threadExecutor(){ - run(); - } - - - + void threadExecutor() { + run(); + } }; } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/RepositoryFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/RepositoryFactory.h b/libminifi/include/core/RepositoryFactory.h index 03ed524..ed9a026 100644 --- a/libminifi/include/core/RepositoryFactory.h +++ b/libminifi/include/core/RepositoryFactory.h @@ -19,9 +19,8 @@ #ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ #define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ - #include "core/Repository.h" -#include "core.h" +#include "Core.h" namespace org { namespace apache { @@ -30,10 +29,8 @@ namespace minifi { namespace core { - std::shared_ptr<core::Repository> createRepository( - const std::string configuration_class_name, bool fail_safe = false); - - +std::shared_ptr<core::Repository> createRepository( + const std::string configuration_class_name, bool fail_safe = false); } /* namespace core */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/core.h b/libminifi/include/core/core.h deleted file mode 100644 index a70dbd4..0000000 --- a/libminifi/include/core/core.h +++ /dev/null @@ -1,180 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_CORE_H_ -#define LIBMINIFI_INCLUDE_CORE_CORE_H_ - -#include <uuid/uuid.h> -#include <cxxabi.h> -#include "core/logging/Logger.h" -/** - * namespace aliasing - */ -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { -} -namespace processors { -} -namespace provenance { - -} -namespace core { - -template<typename T> -static inline std::string getClassName() { - char *b = abi::__cxa_demangle(typeid(T).name(), 0, 0, 0); - std::string name = b; - delete [] b; - return name; -} - -template<typename T> -struct class_operations { - - template<typename Q=T> - static std::true_type canDestruct(decltype(std::declval<Q>().~Q()) *) { - return std::true_type(); - } - - - template<typename Q=T> - static std::false_type canDestruct(...) { - return std::false_type(); - } - - typedef decltype(canDestruct<T>(0)) type; - - static const bool value = type::value; /* Which is it? */ -}; - - -template<typename T> -typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { - throw std::runtime_error("Cannot instantiate class"); -} - -template<typename T> -typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { - return std::make_shared<T>(); -} - -/** - * Base component within MiNiFi - * Purpose: Many objects store a name and UUID, therefore - * the functionality is localized here to avoid duplication - */ -class CoreComponent { - - public: - - /** - * Constructor that sets the name and uuid. - */ - explicit CoreComponent(const std::string name, uuid_t uuid = 0) - : logger_(logging::Logger::getLogger()), - name_(name) { - if (!uuid) - // Generate the global UUID for the flow record - uuid_generate(uuid_); - else - uuid_copy(uuid_, uuid); - - char uuidStr[37]; - uuid_unparse_lower(uuid_, uuidStr); - uuidStr_ = uuidStr; - } - - /** - * Move Constructor. - */ - explicit CoreComponent(const CoreComponent &&other) - : name_(std::move(other.name_)), - logger_(logging::Logger::getLogger()) { - uuid_copy(uuid_, other.uuid_); - } - - // Get component name Name - std::string getName(); - - /** - * Set name. - * @param name - */ - void setName(const std::string name); - - /** - * Set UUID in this instance - * @param uuid uuid to apply to the internal representation. - */ - void setUUID(uuid_t uuid); - - /** - * Returns the UUID through the provided object. - * @param uuid uuid struct to which we will copy the memory - * @return success of request - */ - bool getUUID(uuid_t uuid); - - unsigned const char *getUUID(); - /** - * Return the UUID string - * @param constant reference to the UUID str - */ - const std::string & getUUIDStr() { - return uuidStr_; - } - - void loadComponent(){ - } - - protected: - // A global unique identifier - uuid_t uuid_; - // UUID string - std::string uuidStr_; - - // logger shared ptr - std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_; - - // Connectable's name - std::string name_; -}; - -namespace logging { -} -} -} -} -} -} - -namespace minifi = org::apache::nifi::minifi; - -namespace core = org::apache::nifi::minifi::core; - -namespace processors = org::apache::nifi::minifi::processors; - -namespace logging = org::apache::nifi::minifi::core::logging; - -namespace utils = org::apache::nifi::minifi::utils; - -namespace provenance = org::apache::nifi::minifi::provenance; - -#endif /* LIBMINIFI_INCLUDE_CORE_CORE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index 0115588..eed1975 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -23,7 +23,7 @@ #include "leveldb/slice.h" #include "leveldb/status.h" #include "core/Repository.h" -#include "core/core.h" +#include "core/Core.h" #include "Connection.h" namespace org { @@ -33,8 +33,6 @@ namespace minifi { namespace core { namespace repository { - - #define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" #define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M #define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute @@ -44,12 +42,11 @@ namespace repository { * Flow File repository * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. */ -class FlowFileRepository : public core::Repository, public std::enable_shared_from_this<FlowFileRepository> { +class FlowFileRepository : public core::Repository, + public std::enable_shared_from_this<FlowFileRepository> { public: // Constructor - - FlowFileRepository(std::string directory, int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t purgePeriod) : Repository(core::getClassName<FlowFileRepository>(), directory, @@ -58,10 +55,12 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr { db_ = NULL; } - - FlowFileRepository() : FlowFileRepository(FLOWFILE_REPOSITORY_DIRECTORY, - MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, FLOWFILE_REPOSITORY_PURGE_PERIOD) - { + + FlowFileRepository() + : FlowFileRepository(FLOWFILE_REPOSITORY_DIRECTORY, + MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, + FLOWFILE_REPOSITORY_PURGE_PERIOD) { } // Destructor @@ -112,63 +111,60 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr } virtual void run(); - - virtual bool Put(std::string key, uint8_t *buf, int bufLen) - { - - // persistent to the DB - leveldb::Slice value((const char *) buf, bufLen); - leveldb::Status status; - status = db_->Put(leveldb::WriteOptions(), key, value); - if (status.ok()) - return true; - else - return false; + + virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + + // persistent to the DB + leveldb::Slice value((const char *) buf, bufLen); + leveldb::Status status; + status = db_->Put(leveldb::WriteOptions(), key, value); + if (status.ok()) + return true; + else + return false; } /** - * - * Deletes the key - * @return status of the delete operation - */ - virtual bool Delete(std::string key) - { - leveldb::Status status; - status = db_->Delete(leveldb::WriteOptions(), key); - if (status.ok()) - return true; - else - return false; + * + * Deletes the key + * @return status of the delete operation + */ + virtual bool Delete(std::string key) { + leveldb::Status status; + status = db_->Delete(leveldb::WriteOptions(), key); + if (status.ok()) + return true; + else + return false; } /** - * Sets the value from the provided key - * @return status of the get operation. - */ - virtual bool Get(std::string key, std::string &value) - { - leveldb::Status status; - status = db_->Get(leveldb::ReadOptions(), key, &value); - if (status.ok()) - return true; - else - return false; + * Sets the value from the provided key + * @return status of the get operation. + */ + virtual bool Get(std::string key, std::string &value) { + leveldb::Status status; + status = db_->Get(leveldb::ReadOptions(), key, &value); + if (status.ok()) + return true; + else + return false; } - - void setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) - { - this->connectionMap=connectionMap; + + void setConnectionMap( + std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) { + this->connectionMap = connectionMap; } void loadComponent(); - - void start() { - if (this->purge_period_ <= 0) - return; - if (running_) - return; - thread_ = std::thread(&FlowFileRepository::run, shared_from_this()); - thread_.detach(); - running_ = true; - logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); -} + + void start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&FlowFileRepository::run, shared_from_this()); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); + } private: std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/io/BaseStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h index b0b3589..e2c0474 100644 --- a/libminifi/include/io/BaseStream.h +++ b/libminifi/include/io/BaseStream.h @@ -30,7 +30,6 @@ namespace nifi { namespace minifi { namespace io { - class BaseStream : public DataStream, public Serializable { public: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index 97cace2..6998950 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -26,7 +26,7 @@ #include <mutex> #include <atomic> #include "io/BaseStream.h" -#include "core/core.h" +#include "core/Core.h" #include "core/logging/Logger.h" #include "io/validation.h" @@ -70,14 +70,14 @@ class Socket : public BaseStream { */ explicit Socket(const Socket &&); - static std::string HOSTNAME; + static char *HOSTNAME; /** * Static function to return the current machine's host name */ - static std::string getMyHostName(std::string *str = &HOSTNAME) { + static std::string getMyHostName(const char *str = HOSTNAME) { if (__builtin_expect(!IsNullOrEmpty(str), 0)) - return *str; + return str; else { char hostname[1024]; gethostname(hostname, 1024); @@ -98,7 +98,7 @@ class Socket : public BaseStream { * Initializes the socket * @return result of the creation operation. */ - virtual short initialize(); + virtual int16_t initialize(); std::string getHostname() const; @@ -217,14 +217,14 @@ class Socket : public BaseStream { * Sets socket options depending on the instance. * @param sock socket file descriptor. */ - virtual short setSocketOptions(const int sock); + virtual int16_t setSocketOptions(const int sock); /** * Attempt to select the socket file descriptor * @param msec timeout interval to wait * @returns file descriptor */ - virtual short select_descriptor(const uint16_t msec); + virtual int16_t select_descriptor(const uint16_t msec); std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/io/StreamFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h index faa10b5..f942f63 100644 --- a/libminifi/include/io/StreamFactory.h +++ b/libminifi/include/io/StreamFactory.h @@ -39,18 +39,18 @@ namespace io { template<typename T> class SocketCreator { - template<bool cond, typename U> - using TypeCheck = typename std::enable_if< cond, U >::type; - -public: - template<typename U = T> - TypeCheck<true, U> *create(const std::string &host, const uint16_t port) { - return new T(host, port); - } - template<typename U = T> - TypeCheck<false, U> *create(const std::string &host, const uint16_t port) { - return new Socket(host, port); - } + template<bool cond, typename U> + using TypeCheck = typename std::enable_if< cond, U >::type; + + public: + template<typename U = T> + TypeCheck<true, U> *create(const std::string &host, const uint16_t port) { + return new T(host, port); + } + template<typename U = T> + TypeCheck<false, U> *create(const std::string &host, const uint16_t port) { + return new Socket(host, port); + } }; @@ -60,73 +60,72 @@ public: **/ class StreamFactory { -public: - - /** - * Build an instance, creating a memory fence, which - * allows us to avoid locking. This is tantamount to double checked locking. - * @returns new StreamFactory; - */ - static StreamFactory *getInstance() { - StreamFactory* atomic_context = context_instance_.load( - std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_acquire); - if (atomic_context == nullptr) { - std::lock_guard < std::mutex > lock(context_mutex_); - atomic_context = context_instance_.load(std::memory_order_relaxed); - if (atomic_context == nullptr) { - atomic_context = new StreamFactory(); - std::atomic_thread_fence(std::memory_order_release); - context_instance_.store(atomic_context, - std::memory_order_relaxed); - } - } - return atomic_context; - } - - /** - * Creates a socket and returns a unique ptr - * - */ - std::unique_ptr<Socket> createSocket(const std::string &host, - const uint16_t port) { - Socket *socket = 0; - - if (is_secure_) { - socket = createSocket<TLSSocket>(host, port); - } else { - socket = createSocket<Socket>(host, port); - } - return std::unique_ptr < Socket > (socket); - } - -protected: - - /** - * Creates a socket and returns a unique ptr - * - */ - template<typename T> - Socket *createSocket(const std::string &host, const uint16_t port) { - SocketCreator<T> creator; - return creator.create(host, port); - } - - StreamFactory() : - configure_(Configure::getConfigure()) { - std::string secureStr; - is_secure_ = false; - if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool( - secureStr, is_secure_); - } - } - - bool is_secure_; - static std::atomic<StreamFactory*> context_instance_; - static std::mutex context_mutex_; - - Configure *configure_; + public: + + /** + * Build an instance, creating a memory fence, which + * allows us to avoid locking. This is tantamount to double checked locking. + * @returns new StreamFactory; + */ + static StreamFactory *getInstance() { + StreamFactory* atomic_context = context_instance_.load( + std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (atomic_context == nullptr) { + std::lock_guard<std::mutex> lock(context_mutex_); + atomic_context = context_instance_.load(std::memory_order_relaxed); + if (atomic_context == nullptr) { + atomic_context = new StreamFactory(); + std::atomic_thread_fence(std::memory_order_release); + context_instance_.store(atomic_context, std::memory_order_relaxed); + } + } + return atomic_context; + } + + /** + * Creates a socket and returns a unique ptr + * + */ + std::unique_ptr<Socket> createSocket(const std::string &host, + const uint16_t port) { + Socket *socket = 0; + + if (is_secure_) { + socket = createSocket<TLSSocket>(host, port); + } else { + socket = createSocket<Socket>(host, port); + } + return std::unique_ptr<Socket>(socket); + } + + protected: + + /** + * Creates a socket and returns a unique ptr + * + */ + template<typename T> + Socket *createSocket(const std::string &host, const uint16_t port) { + SocketCreator<T> creator; + return creator.create(host, port); + } + + StreamFactory() + : configure_(Configure::getConfigure()) { + std::string secureStr; + is_secure_ = false; + if (configure_->get(Configure::nifi_remote_input_secure, secureStr)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, + is_secure_); + } + } + + bool is_secure_; + static std::atomic<StreamFactory*> context_instance_; + static std::mutex context_mutex_; + + Configure *configure_; }; } /* namespace io */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/io/tls/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index f86f8bc..2762ba8 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -18,10 +18,12 @@ #ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ #define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ +#include <openssl/ssl.h> +#include <openssl/err.h> #include <cstdint> -#include "../ClientSocket.h" #include <atomic> #include <mutex> +#include "../ClientSocket.h" #include "properties/Configure.h" @@ -31,9 +33,6 @@ namespace nifi { namespace minifi { namespace io { -#include <openssl/ssl.h> -#include <openssl/err.h> - #define TLS_ERROR_CONTEXT 1 #define TLS_ERROR_PEM_MISSING 2 #define TLS_ERROR_CERT_MISSING 3 @@ -75,11 +74,11 @@ class TLSContext { return ctx; } - short getError() { + int16_t getError() { return error_value; } - short initialize(); + int16_t initialize(); private: @@ -113,7 +112,7 @@ class TLSContext { Configure *configuration; SSL_CTX *ctx; - short error_value; + int16_t error_value; static std::atomic<TLSContext*> context_instance; static std::mutex context_mutex; @@ -151,14 +150,14 @@ class TLSSocket : public Socket { * Initializes the socket * @return result of the creation operation. */ - short initialize(); + int16_t initialize(); /** * Attempt to select the socket file descriptor * @param msec timeout interval to wait * @returns file descriptor */ - virtual short select_descriptor(const uint16_t msec); + virtual int16_t select_descriptor(const uint16_t msec); /** * Reads data and places it into buf http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/io/validation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h index c66c412..9dd1b8a 100644 --- a/libminifi/include/io/validation.h +++ b/libminifi/include/io/validation.h @@ -22,7 +22,6 @@ #include <string> #include <cstring> - /** * A checker that will, at compile time, tell us * if the declared type has a size method. @@ -42,7 +41,6 @@ class size_function_functor_checker { }; }; - /** * Determines if the variable is null or ::size() == 0 */ @@ -72,9 +70,15 @@ static auto IsNullOrEmpty( /** * Determines if the variable is null or strlen(str) == 0 */ -static auto IsNullOrEmpty(char *str)-> decltype(NULL !=str, bool()) { +static auto IsNullOrEmpty(const char *str)-> decltype(NULL !=str, bool()) { return (NULL == str || strlen(str) == 0); } +/** + * Determines if the variable is null or strlen(str) == 0 + */ +static auto IsNullOrEmpty(char *str)-> decltype(NULL !=str, bool()) { + return (NULL == str || strlen(str) == 0); +} #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/AppendHostInfo.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h index 6515918..a16dff3 100644 --- a/libminifi/include/processors/AppendHostInfo.h +++ b/libminifi/include/processors/AppendHostInfo.h @@ -24,7 +24,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -47,7 +47,7 @@ class AppendHostInfo : public core::Processor { virtual ~AppendHostInfo() { } // Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "AppendHostInfo"; // Supported Properties static core::Property InterfaceName; static core::Property HostAttribute; @@ -58,9 +58,8 @@ class AppendHostInfo : public core::Processor { public: // OnTrigger method, implemented by NiFi AppendHostInfo - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi AppendHostInfo virtual void initialize(void); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h index 123eed3..f74f489 100644 --- a/libminifi/include/processors/ExecuteProcess.h +++ b/libminifi/include/processors/ExecuteProcess.h @@ -34,7 +34,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -64,7 +64,7 @@ class ExecuteProcess : public core::Processor { kill(_pid, SIGTERM); } // Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "ExecuteProcess"; // Supported Properties static core::Property Command; static core::Property CommandArguments; @@ -91,9 +91,8 @@ class ExecuteProcess : public core::Processor { public: // OnTrigger method, implemented by NiFi ExecuteProcess - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi ExecuteProcess virtual void initialize(void); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h index c4ab6fe..d15a02c 100644 --- a/libminifi/include/processors/GenerateFlowFile.h +++ b/libminifi/include/processors/GenerateFlowFile.h @@ -23,7 +23,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -48,7 +48,7 @@ class GenerateFlowFile : public core::Processor { delete[] _data; } // Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "GenerateFlowFile"; // Supported Properties static core::Property FileSize; static core::Property BatchSize; @@ -75,9 +75,8 @@ class GenerateFlowFile : public core::Processor { public: // OnTrigger method, implemented by NiFi GenerateFlowFile - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi GenerateFlowFile virtual void initialize(void); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index cc3beaa..5345404 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -22,7 +22,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -30,19 +30,19 @@ namespace nifi { namespace minifi { namespace processors { - struct GetFileRequest{ - std::string directory = "."; - bool recursive = true; - bool keepSourceFile = false; - int64_t minAge = 0; - int64_t maxAge = 0; - int64_t minSize = 0; - int64_t maxSize = 0; - bool ignoreHiddenFile = true; - int64_t pollInterval = 0; - int64_t batchSize = 10; - std::string fileFilter= "[^\\.].*"; - }; +struct GetFileRequest { + std::string directory = "."; + bool recursive = true; + bool keepSourceFile = false; + int64_t minAge = 0; + int64_t maxAge = 0; + int64_t minSize = 0; + int64_t maxSize = 0; + bool ignoreHiddenFile = true; + int64_t pollInterval = 0; + int64_t batchSize = 10; + std::string fileFilter = "[^\\.].*"; +}; // GetFile Class class GetFile : public core::Processor { @@ -59,7 +59,7 @@ class GetFile : public core::Processor { virtual ~GetFile() { } // Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "GetFile"; // Supported Properties static core::Property Directory; static core::Property Recurse; @@ -76,19 +76,22 @@ class GetFile : public core::Processor { static core::Relationship Success; public: - // OnTrigger method, implemented by NiFi GetFile - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); /** * Function that's executed when the processor is scheduled. * @param context process context. * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - void onSchedule( - core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + void onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory); + /** + * Execution trigger for the GetFile Processor + * @param context processor context + * @param session processor session reference. + */ + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); + // Initialize, over write by NiFi GetFile virtual void initialize(void); /** @@ -96,7 +99,7 @@ class GetFile : public core::Processor { * @param dir directory to list * @param request get file request. */ - void performListing(std::string dir,const GetFileRequest &request); + void performListing(std::string dir, const GetFileRequest &request); protected: @@ -114,9 +117,11 @@ class GetFile : public core::Processor { // Put full path file name into directory listing void putListing(std::string fileName); // Poll directory listing for files - void pollListing(std::queue<std::string> &list,const GetFileRequest &request); + void pollListing(std::queue<std::string> &list, + const GetFileRequest &request); // Check whether file can be added to the directory listing - bool acceptFile(std::string fullName, std::string name, const GetFileRequest &request); + bool acceptFile(std::string fullName, std::string name, + const GetFileRequest &request); // Get file request object. GetFileRequest request_; // Mutex for protection of the directory listing http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h index adaefb1..69432be 100644 --- a/libminifi/include/processors/ListenHTTP.h +++ b/libminifi/include/processors/ListenHTTP.h @@ -28,7 +28,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -51,7 +51,7 @@ class ListenHTTP : public core::Processor { // Destructor virtual ~ListenHTTP(); // Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "ListenHTTP"; // Supported Properties static core::Property BasePath; static core::Property Port; @@ -64,20 +64,18 @@ class ListenHTTP : public core::Processor { // Supported Relationships static core::Relationship Success; - void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + void onTrigger(core::ProcessContext *context, core::ProcessSession *session); void initialize(); - void onSchedule( - core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + void onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory); // HTTP request handler class Handler : public CivetHandler { public: - Handler( - core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory, - std::string &&authDNPattern, std::string &&headersAsAttributesPattern); + Handler(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory, + std::string &&authDNPattern, + std::string &&headersAsAttributesPattern); bool handlePost(CivetServer *server, struct mg_connection *conn); private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h index 1e1e11f..cbbdf41 100644 --- a/libminifi/include/processors/ListenSyslog.h +++ b/libminifi/include/processors/ListenSyslog.h @@ -35,7 +35,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -45,7 +45,7 @@ namespace processors { // SyslogEvent typedef struct { - uint8_t *payload; + char *payload; uint64_t len; } SysLogEvent; @@ -95,7 +95,7 @@ class ListenSyslog : public core::Processor { } } // Processor Name - static const std::string ProcessorName; + static constexpr char const *ProcessorName = "ListenSyslog"; // Supported Properties static core::Property RecvBufSize; static core::Property MaxSocketBufSize; @@ -125,9 +125,8 @@ class ListenSyslog : public core::Processor { public: // OnTrigger method, implemented by NiFi ListenSyslog - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi ListenSyslog virtual void initialize(void); @@ -163,7 +162,7 @@ class ListenSyslog : public core::Processor { void putEvent(uint8_t *payload, uint64_t len) { std::lock_guard<std::mutex> lock(mutex_); SysLogEvent event; - event.payload = payload; + event.payload = reinterpret_cast<char*>(payload); event.len = len; _eventQueue.push(event); _eventQueueByteSize += len; @@ -204,7 +203,7 @@ class ListenSyslog : public core::Processor { bool _resetServerSocket; bool _serverTheadRunning; // buffer for read socket - uint8_t _buffer[2048]; + char _buffer[2048]; }; } /* namespace processors */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h index 37c0ec3..dcc802d 100644 --- a/libminifi/include/processors/LogAttribute.h +++ b/libminifi/include/processors/LogAttribute.h @@ -23,7 +23,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -46,7 +46,7 @@ class LogAttribute : public core::Processor { virtual ~LogAttribute() { } // Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "LogAttribute"; // Supported Properties static core::Property LogLevel; static core::Property AttributesToLog; @@ -108,9 +108,8 @@ class LogAttribute : public core::Processor { public: // OnTrigger method, implemented by NiFi LogAttribute - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi LogAttribute virtual void initialize(void); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index c0effaf..cc5dfca 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -23,7 +23,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -35,9 +35,11 @@ namespace processors { class PutFile : public core::Processor { public: - static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE; - static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE; - static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL; + static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_REPLACE = "replace"; + static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_IGNORE = "ignore"; + static constexpr char const* CONFLICT_RESOLUTION_STRATEGY_FAIL = "fail"; + + static constexpr char const* ProcessorName = "PutFile"; // Constructor /*! @@ -49,8 +51,7 @@ class PutFile : public core::Processor { // Destructor virtual ~PutFile() { } - // Processor Name - static const std::string ProcessorName; + // Supported Properties static core::Property Directory; static core::Property ConflictResolution; @@ -64,8 +65,8 @@ class PutFile : public core::Processor { * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - void onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + virtual void onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory); // OnTrigger method, implemented by NiFi PutFile virtual void onTrigger(core::ProcessContext *context, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/RealTimeDataCollector.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/RealTimeDataCollector.h b/libminifi/include/processors/RealTimeDataCollector.h deleted file mode 100644 index 41bd814..0000000 --- a/libminifi/include/processors/RealTimeDataCollector.h +++ /dev/null @@ -1,145 +0,0 @@ -/** - * @file RealTimeDataCollector.h - * RealTimeDataCollector class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __REAL_TIME_DATA_COLLECTOR_H__ -#define __REAL_TIME_DATA_COLLECTOR_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> -#include <string> -#include <errno.h> -#include "FlowFileRecord.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/core.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -// RealTimeDataCollector Class -class RealTimeDataCollector : public core::Processor { - public: - // Constructor - /*! - * Create a new processor - */ - explicit RealTimeDataCollector(std::string name, uuid_t uuid = NULL) - : core::Processor(name, uuid) { - _realTimeSocket = 0; - _batchSocket = 0; - logger_ = logging::Logger::getLogger(); - _firstInvoking = false; - _realTimeAccumulated = 0; - _batchAcccumulated = 0; - _queuedDataSize = 0; - } - // Destructor - virtual ~RealTimeDataCollector() { - if (_realTimeSocket) - close(_realTimeSocket); - if (_batchSocket) - close(_batchSocket); - if (_fileStream.is_open()) - _fileStream.close(); - } - // Processor Name - static const std::string ProcessorName; - // Supported Properties - static core::Property REALTIMESERVERNAME; - static core::Property REALTIMESERVERPORT; - static core::Property BATCHSERVERNAME; - static core::Property BATCHSERVERPORT; - static core::Property FILENAME; - static core::Property ITERATION; - static core::Property REALTIMEMSGID; - static core::Property BATCHMSGID; - static core::Property REALTIMEINTERVAL; - static core::Property BATCHINTERVAL; - static core::Property BATCHMAXBUFFERSIZE; - // Supported Relationships - static core::Relationship Success; - // Connect to the socket - int connectServer(const char *host, uint16_t port); - int sendData(int socket, const char *buf, int buflen); - void onTriggerRealTime( - core::ProcessContext *context, - core::ProcessSession *session); - void onTriggerBatch(core::ProcessContext *context, - core::ProcessSession *session); - - public: - // OnTrigger method, implemented by NiFi RealTimeDataCollector - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); - // Initialize, over write by NiFi RealTimeDataCollector - virtual void initialize(void); - - protected: - - private: - // realtime server Name - std::string _realTimeServerName; - int64_t _realTimeServerPort; - std::string _batchServerName; - int64_t _batchServerPort; - int64_t _realTimeInterval; - int64_t _batchInterval; - int64_t _batchMaxBufferSize; - // Match pattern for Real time Message ID - std::vector<std::string> _realTimeMsgID; - // Match pattern for Batch Message ID - std::vector<std::string> _batchMsgID; - // file for which the realTime collector will tail - std::string _fileName; - // Whether we need to iterate from the beginning for demo - bool _iteration; - int _realTimeSocket; - int _batchSocket; - // Logger - std::shared_ptr<logging::Logger> logger_; - // Mutex for protection - std::mutex mutex_; - // Queued data size - uint64_t _queuedDataSize; - // Queue for the batch process - std::queue<std::string> _queue; - std::thread::id _realTimeThreadId; - std::thread::id _batchThreadId; - std::atomic<bool> _firstInvoking; - int64_t _realTimeAccumulated; - int64_t _batchAcccumulated; - std::ifstream _fileStream; -}; - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/processors/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index c7b7b46..c6349a0 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -23,7 +23,7 @@ #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -47,7 +47,7 @@ class TailFile : public core::Processor { storeState(); } // Processor Name - static const std::string ProcessorName; + static constexpr char const* ProcessorName = "TailFile"; // Supported Properties static core::Property FileName; static core::Property StateFile; @@ -76,6 +76,8 @@ class TailFile : public core::Processor { uint64_t _currentTailFilePosition; bool _stateRecovered; uint64_t _currentTailFileCreatedTime; + static const int BUFFER_SIZE = 512; + // Utils functions for parse state file std::string trimLeft(const std::string& s); std::string trimRight(const std::string& s); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index c0d9bd4..d1d045c 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -27,7 +27,7 @@ #include <errno.h> #include <iostream> #include <fstream> -#include "core/core.h" +#include "core/Core.h" #include "core/logging/Logger.h" namespace org { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 3d5d19e..82754c4 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -30,7 +30,6 @@ #include <thread> #include <vector> - #include "core/Repository.h" #include "core/Property.h" #include "properties/Configure.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index 8dc152f..2b71fd9 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -22,7 +22,7 @@ #include "leveldb/slice.h" #include "leveldb/status.h" #include "core/Repository.h" -#include "core/core.h" +#include "core/Core.h" #include "provenance/Provenance.h" namespace org { namespace apache { @@ -157,7 +157,7 @@ class ProvenanceRepository : public core::Repository, } } // Run function for the thread - void run(); + void run(); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/utils/FailurePolicy.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/FailurePolicy.h b/libminifi/include/utils/FailurePolicy.h index 98ec18a..38c82f9 100644 --- a/libminifi/include/utils/FailurePolicy.h +++ b/libminifi/include/utils/FailurePolicy.h @@ -29,23 +29,23 @@ namespace utils { */ enum FailurePolicy { - /** - * DO NOTHING - */ - NOTHING, - /** - * Return a response code from the executing function - */ - RETURN, - /** - * Throw an exception for flow control. - */ - EXCEPT, - /** - * Exit the program. This should only be used when something - * precludes us from continuing - */ - EXIT + /** + * DO NOTHING + */ + NOTHING, + /** + * Return a response code from the executing function + */ + RETURN, + /** + * Throw an exception for flow control. + */ + EXCEPT, + /** + * Exit the program. This should only be used when something + * precludes us from continuing + */ + EXIT }; } /* namespace utils */ @@ -54,5 +54,4 @@ enum FailurePolicy { } /* namespace apache */ } /* namespace org */ - #endif /* LIBMINIFI_INCLUDE_UTILS_FAILUREPOLICY_H_ */
