MINIFI-331: Apply formatter with increased line length to source This closes #111.
Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/77a20dbe Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/77a20dbe Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/77a20dbe Branch: refs/heads/master Commit: 77a20dbe3078d5bccee7bba0d0b4f048f5440420 Parents: feec4ea Author: Marc Parisi <[email protected]> Authored: Tue Jun 6 09:22:05 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Tue Jun 6 12:32:22 2017 -0400 ---------------------------------------------------------------------- libminifi/include/Connection.h | 11 +- libminifi/include/EventDrivenSchedulingAgent.h | 15 +- libminifi/include/Exception.h | 5 +- libminifi/include/FlowControlProtocol.h | 26 +- libminifi/include/FlowController.h | 54 +-- libminifi/include/FlowFileRecord.h | 17 +- libminifi/include/RemoteProcessorGroupPort.h | 14 +- libminifi/include/SchedulingAgent.h | 17 +- libminifi/include/Site2SiteClientProtocol.h | 161 ++++---- libminifi/include/Site2SitePeer.h | 17 +- libminifi/include/ThreadedSchedulingAgent.h | 12 +- libminifi/include/TimerDrivenSchedulingAgent.h | 12 +- .../include/controllers/SSLContextService.h | 74 ++-- libminifi/include/core/ClassLoader.h | 23 +- libminifi/include/core/ConfigurationFactory.h | 26 +- libminifi/include/core/Connectable.h | 7 +- libminifi/include/core/Core.h | 6 +- libminifi/include/core/FlowConfiguration.h | 31 +- libminifi/include/core/FlowFile.h | 4 +- libminifi/include/core/ProcessContext.h | 14 +- libminifi/include/core/ProcessGroup.h | 22 +- libminifi/include/core/ProcessSession.h | 107 +++-- libminifi/include/core/Processor.h | 14 +- libminifi/include/core/ProcessorNode.h | 15 +- libminifi/include/core/Property.h | 24 +- libminifi/include/core/Repository.h | 9 +- libminifi/include/core/RepositoryFactory.h | 3 +- libminifi/include/core/Resource.h | 3 +- .../core/controller/ControllerServiceLookup.h | 6 +- .../core/controller/ControllerServiceMap.h | 12 +- .../core/controller/ControllerServiceNode.h | 6 +- .../core/controller/ControllerServiceProvider.h | 110 ++---- .../controller/StandardControllerServiceNode.h | 15 +- .../StandardControllerServiceProvider.h | 90 ++--- libminifi/include/core/logging/Logger.h | 40 +- .../include/core/logging/LoggerConfiguration.h | 63 +-- .../SiteToSiteProvenanceReportingTask.h | 20 +- .../core/repository/FlowFileRepository.h | 46 +-- .../core/repository/VolatileRepository.h | 41 +- libminifi/include/core/yaml/YamlConfiguration.h | 59 +-- libminifi/include/io/BaseStream.h | 20 +- libminifi/include/io/CRCStream.h | 50 +-- libminifi/include/io/ClientSocket.h | 26 +- libminifi/include/io/DataStream.h | 9 +- libminifi/include/io/EndianCheck.h | 2 +- libminifi/include/io/Serializable.h | 18 +- libminifi/include/io/StreamFactory.h | 6 +- libminifi/include/io/TLSSocket.h | 12 +- libminifi/include/io/tls/TLSSocket.h | 49 +-- libminifi/include/io/validation.h | 12 +- libminifi/include/processors/AppendHostInfo.h | 6 +- libminifi/include/processors/ExecuteProcess.h | 8 +- libminifi/include/processors/GenerateFlowFile.h | 3 +- libminifi/include/processors/GetFile.h | 22 +- libminifi/include/processors/InvokeHTTP.h | 19 +- libminifi/include/processors/ListenHTTP.h | 14 +- libminifi/include/processors/ListenSyslog.h | 9 +- libminifi/include/processors/LoadProcessors.h | 1 - libminifi/include/processors/LogAttribute.h | 3 +- libminifi/include/processors/PutFile.h | 19 +- libminifi/include/processors/TailFile.h | 6 +- libminifi/include/properties/Properties.h | 4 +- libminifi/include/provenance/Provenance.h | 73 ++-- .../include/provenance/ProvenanceRepository.h | 114 +++--- libminifi/include/utils/ByteInputCallBack.h | 6 +- libminifi/include/utils/StringUtils.h | 22 +- libminifi/include/utils/ThreadPool.h | 18 +- libminifi/include/utils/TimeUtil.h | 9 +- libminifi/src/Configure.cpp | 57 +-- libminifi/src/Connection.cpp | 22 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 14 +- libminifi/src/FlowControlProtocol.cpp | 84 ++-- libminifi/src/FlowController.cpp | 147 +++---- libminifi/src/FlowFileRecord.cpp | 43 +- libminifi/src/Properties.cpp | 13 +- libminifi/src/RemoteProcessorGroupPort.cpp | 45 +-- libminifi/src/SchedulingAgent.cpp | 21 +- libminifi/src/Site2SiteClientProtocol.cpp | 396 +++++++------------ libminifi/src/Site2SitePeer.cpp | 4 +- libminifi/src/ThreadedSchedulingAgent.cpp | 70 ++-- libminifi/src/TimerDrivenSchedulingAgent.cpp | 17 +- libminifi/src/controllers/SSLContextService.cpp | 41 +- libminifi/src/core/ClassLoader.cpp | 10 +- libminifi/src/core/ConfigurableComponent.cpp | 33 +- libminifi/src/core/ConfigurationFactory.cpp | 36 +- libminifi/src/core/Connectable.cpp | 36 +- libminifi/src/core/FlowConfiguration.cpp | 32 +- libminifi/src/core/FlowFile.cpp | 3 +- libminifi/src/core/ProcessGroup.cpp | 68 ++-- libminifi/src/core/ProcessSession.cpp | 296 +++++--------- libminifi/src/core/Processor.cpp | 61 +-- libminifi/src/core/ProcessorNode.cpp | 3 +- libminifi/src/core/RepositoryFactory.cpp | 18 +- .../core/controller/ControllerServiceNode.cpp | 2 - .../controller/ControllerServiceProvider.cpp | 3 +- .../StandardControllerServiceNode.cpp | 9 +- .../src/core/logging/LoggerConfiguration.cpp | 31 +- .../SiteToSiteProvenanceReportingTask.cpp | 35 +- .../src/core/repository/FlowFileRepository.cpp | 29 +- .../src/core/repository/VolatileRepository.cpp | 7 +- libminifi/src/core/yaml/YamlConfiguration.cpp | 255 ++++-------- libminifi/src/io/BaseStream.cpp | 47 +-- libminifi/src/io/ClientSocket.cpp | 45 +-- libminifi/src/io/DataStream.cpp | 12 +- libminifi/src/io/Serializable.cpp | 33 +- libminifi/src/io/tls/TLSSocket.cpp | 70 ++-- libminifi/src/processors/AppendHostInfo.cpp | 27 +- libminifi/src/processors/ExecuteProcess.cpp | 83 ++-- libminifi/src/processors/GenerateFlowFile.cpp | 31 +- libminifi/src/processors/GetFile.cpp | 105 ++--- libminifi/src/processors/InvokeHTTP.cpp | 265 ++++--------- libminifi/src/processors/ListenHTTP.cpp | 120 ++---- libminifi/src/processors/ListenSyslog.cpp | 68 +--- libminifi/src/processors/LogAttribute.cpp | 41 +- libminifi/src/processors/PutFile.cpp | 62 +-- libminifi/src/processors/TailFile.cpp | 60 +-- libminifi/src/provenance/Provenance.cpp | 109 ++--- .../src/provenance/ProvenanceRepository.cpp | 10 +- libminifi/test/Server.cpp | 58 +-- libminifi/test/TestBase.h | 67 ++-- .../ControllerServiceIntegrationTests.cpp | 91 ++--- .../test/integration/HttpGetIntegrationTest.cpp | 46 +-- .../integration/HttpPostIntegrationTest.cpp | 41 +- .../integration/ProvenanceReportingTest.cpp | 48 +-- .../test/integration/TestExecuteProcess.cpp | 56 +-- libminifi/test/nodefs/NoLevelDB.cpp | 6 +- libminifi/test/unit/CRCTests.cpp | 15 +- libminifi/test/unit/ClassLoaderTests.cpp | 20 +- libminifi/test/unit/ControllerServiceTests.cpp | 26 +- libminifi/test/unit/InvokeHTTPTests.cpp | 120 ++---- .../test/unit/LoggerConfigurationTests.cpp | 4 +- libminifi/test/unit/LoggerTests.cpp | 30 +- libminifi/test/unit/MockClasses.h | 11 +- libminifi/test/unit/ProcessorTests.cpp | 93 ++--- libminifi/test/unit/PropertyTests.cpp | 24 +- libminifi/test/unit/ProvenanceTestHelper.h | 43 +- libminifi/test/unit/ProvenanceTests.cpp | 52 +-- libminifi/test/unit/RepoTests.cpp | 15 +- libminifi/test/unit/Site2SiteTests.cpp | 36 +- libminifi/test/unit/SiteToSiteHelper.h | 9 +- libminifi/test/unit/StringUtilsTests.cpp | 11 +- libminifi/test/unit/YamlConfigurationTests.cpp | 15 +- 142 files changed, 1996 insertions(+), 3881 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Connection.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index 8fe42d0..be51fce 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -41,17 +41,13 @@ namespace nifi { namespace minifi { // Connection Class -class Connection : public core::Connectable, - public std::enable_shared_from_this<Connection> { +class Connection : public core::Connectable, public std::enable_shared_from_this<Connection> { public: // Constructor /* * Create a new processor */ - explicit Connection(std::shared_ptr<core::Repository> flow_repository, - std::string name, uuid_t uuid = NULL, uuid_t srcUUID = - NULL, - uuid_t destUUID = NULL); + explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL); // Destructor virtual ~Connection() { } @@ -137,8 +133,7 @@ class Connection : public core::Connectable, // Put the flow file into queue void put(std::shared_ptr<core::FlowFile> flow); // Poll the flow file from queue, the expired flow file record also being returned - std::shared_ptr<core::FlowFile> poll( - std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords); + std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords); // Drain the flow records void drain(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/EventDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index 2e49ddf..6a63dc5 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -38,27 +38,20 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { /*! * Create a new event driven scheduling agent. */ - EventDrivenSchedulingAgent( - std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, - std::shared_ptr<core::Repository> repo, - std::shared_ptr<Configure> configuration) - : ThreadedSchedulingAgent(controller_service_provider, repo, - configuration) { + EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) + : ThreadedSchedulingAgent(controller_service_provider, repo, configuration) { } // Destructor virtual ~EventDrivenSchedulingAgent() { } // Run function for the thread - void run(std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory); + void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); private: // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent); - EventDrivenSchedulingAgent &operator=( - const EventDrivenSchedulingAgent &parent); + EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent); }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Exception.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h index 88a3ed2..080d1bf 100644 --- a/libminifi/include/Exception.h +++ b/libminifi/include/Exception.h @@ -44,9 +44,8 @@ enum ExceptionType { }; // Exception String -static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", - "Flow File Operation", "Processor Operation", "Process Session Operation", - "Process Schedule Operation", "Site2Site Protocol", "General Operation" }; +static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", "Flow File Operation", "Processor Operation", "Process Session Operation", "Process Schedule Operation", "Site2Site Protocol", + "General Operation" }; // Exception Type to String inline const char *ExceptionTypeToString(ExceptionType type) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index 73399ae..8992049 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -58,8 +58,7 @@ typedef enum { } FlowControlMsgType; // FlowControl Protocol Msg Type String -static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { - "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" }; +static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" }; // Flow Control Msg Type to String inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) { @@ -91,10 +90,8 @@ typedef enum { } FlowControlMsgID; // FlowControl Protocol Msg ID String -static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { - "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", - "REPORT_INTERVAL", "PROCESSOR_NAME" - "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" }; +static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", "REPORT_INTERVAL", "PROCESSOR_NAME" + "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" }; #define TYPE_HDR_LEN 4 // Fix Hdr Type #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes @@ -130,9 +127,7 @@ typedef enum { } FlowControlRespCode; // FlowControl Resp Code str -static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", - "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", - "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" }; +static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" }; // Flow Control Resp Code to String inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) { @@ -157,7 +152,8 @@ class FlowControlProtocol { /*! * Create a new control protocol */ - FlowControlProtocol(FlowController *controller, const std::shared_ptr<Configure> &configure) : logger_(logging::LoggerFactory<FlowControlProtocol>::getLogger()) { + FlowControlProtocol(FlowController *controller, const std::shared_ptr<Configure> &configure) + : logger_(logging::LoggerFactory<FlowControlProtocol>::getLogger()) { _controller = controller; _socket = 0; _serverName = "localhost"; @@ -175,17 +171,13 @@ class FlowControlProtocol { _serverName = value; logger_->log_info("NiFi Server Name %s", _serverName.c_str()); } - if (configure->get(Configure::nifi_server_port, value) - && core::Property::StringToInt(value, _serverPort)) { + if (configure->get(Configure::nifi_server_port, value) && core::Property::StringToInt(value, _serverPort)) { logger_->log_info("NiFi Server Port: [%d]", _serverPort); } if (configure->get(Configure::nifi_server_report_interval, value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, _reportInterval, unit) - && core::Property::ConvertTimeUnitToMS(_reportInterval, unit, - _reportInterval)) { - logger_->log_info("NiFi server report interval: [%d] ms", - _reportInterval); + if (core::Property::StringToTime(value, _reportInterval, unit) && core::Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) { + logger_->log_info("NiFi server report interval: [%d] ms", _reportInterval); } } else _reportInterval = 0; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 59865d4..dc0d610 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -58,8 +58,7 @@ namespace minifi { * Flow Controller class. Generally used by FlowController factory * as a singleton. */ -class FlowController : public core::controller::ControllerServiceProvider, - public std::enable_shared_from_this<FlowController> { +class FlowController : public core::controller::ControllerServiceProvider, public std::enable_shared_from_this<FlowController> { public: static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; @@ -67,9 +66,7 @@ class FlowController : public core::controller::ControllerServiceProvider, /** * Flow controller constructor */ - FlowController(std::shared_ptr<core::Repository> provenance_repo, - std::shared_ptr<core::Repository> flow_file_repo, - std::shared_ptr<Configure> configure, + FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, const std::string name = DEFAULT_ROOT_GROUP_NAME, bool headless_mode = false); @@ -125,8 +122,7 @@ class FlowController : public core::controller::ControllerServiceProvider, // Load new xml virtual void reload(std::string yamlFile); // update property value - void updatePropertyValue(std::string processorName, std::string propertyName, - std::string propertyValue) { + void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { if (root_ != nullptr) root_->updatePropertyValue(processorName, propertyName, propertyValue); } @@ -142,9 +138,8 @@ class FlowController : public core::controller::ControllerServiceProvider, * @param id service identifier * @param firstTimeAdded first time this CS was added */ - virtual std::shared_ptr<core::controller::ControllerServiceNode> createControllerService( - const std::string &type, const std::string &id, - bool firstTimeAdded); + virtual std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &type, const std::string &id, + bool firstTimeAdded); /** * controller service provider @@ -154,29 +149,25 @@ class FlowController : public core::controller::ControllerServiceProvider, * @param serviceNode service node to be removed. */ - virtual void removeControllerService( - const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void removeControllerService(const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Enables the controller service services * @param serviceNode service node which will be disabled, along with linked services. */ - virtual void enableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Enables controller services * @param serviceNoden vector of service nodes which will be enabled, along with linked services. */ - virtual void enableControllerServices( - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes); + virtual void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes); /** * Disables controller services * @param serviceNode service node which will be disabled, along with linked services. */ - virtual void disableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Gets all controller services. @@ -188,38 +179,32 @@ class FlowController : public core::controller::ControllerServiceProvider, * @param id service identifier * @return shared pointer to the controller service node or nullptr if it does not exist. */ - virtual std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode( - const std::string &id); + virtual std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode(const std::string &id); - virtual void verifyCanStopReferencingComponents( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Unschedules referencing components. */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Verify can disable referencing components * @param serviceNode service node whose referenced components will be scheduled. */ - virtual void verifyCanDisableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Disables referencing components * @param serviceNode service node whose referenced components will be scheduled. */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Verify can enable referencing components * @param serviceNode service node whose referenced components will be scheduled. */ - virtual void verifyCanEnableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Determines if the controller service specified by identifier is enabled. @@ -230,22 +215,19 @@ class FlowController : public core::controller::ControllerServiceProvider, * Enables referencing components * @param serviceNode service node whose referenced components will be scheduled. */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Schedules referencing components * @param serviceNode service node whose referenced components will be scheduled. */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Returns controller service components referenced by serviceIdentifier from the embedded * controller service provider; */ - std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent( - const std::string &serviceIdentifier, const std::string &componentId); + std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const std::string &componentId); /** * Enables all controller services for the provider. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index 1d41b60..400cedc 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -67,9 +67,7 @@ enum FlowAttribute { }; // FlowFile Attribute Key -static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", - "absolute.path", "filename", "uuid", "priority", "mime.type", - "discard.reason", "alternate.identifier" }; +static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", "absolute.path", "filename", "uuid", "priority", "mime.type", "discard.reason", "alternate.identifier" }; // FlowFile Attribute Enum to Key inline const char *FlowAttributeKey(FlowAttribute attribute) { @@ -96,22 +94,17 @@ class FlowFileRecord : public core::FlowFile, public io::Serializable { /* * Create a new flow record */ - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, - std::map<std::string, std::string> attributes, - std::shared_ptr<ResourceClaim> claim = nullptr); + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::map<std::string, std::string> attributes, std::shared_ptr<ResourceClaim> claim = nullptr); - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, - std::shared_ptr<core::FlowFile> &event); + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event); - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, - std::shared_ptr<core::FlowFile> &event, - const std::string &uuidConnection); + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection); explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository) : FlowFile(), flow_repository_(flow_repository), snapshot_(""), - logger_(logging::LoggerFactory<FlowFileRecord>::getLogger()) { + logger_(logging::LoggerFactory<FlowFileRecord>::getLogger()) { } // Destructor http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 1bdbb38..9f89b07 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -42,9 +42,7 @@ class RemoteProcessorGroupPort : public core::Processor { /*! * Create a new processor */ - RemoteProcessorGroupPort( - const std::shared_ptr<io::StreamFactory> &stream_factory, - std::string name, uuid_t uuid = nullptr) + RemoteProcessorGroupPort(const std::shared_ptr<io::StreamFactory> &stream_factory, std::string name, uuid_t uuid = nullptr) : core::Processor(name, uuid), direction_(SEND), transmitting_(false), @@ -59,19 +57,17 @@ class RemoteProcessorGroupPort : public core::Processor { } // Processor Name - static const std::string ProcessorName; + static const char *ProcessorName; // Supported Properties static core::Property hostName; static core::Property port; static core::Property portUUID; // Supported Relationships static core::Relationship relation; - public: - void onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory); + public: + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); // OnTrigger method, implemented by NiFi RemoteProcessorGroupPort - virtual void onTrigger(core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // Initialize, over write by NiFi RemoteProcessorGroupPort virtual void initialize(void); // Set Direction http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 37e26c6..22f79db 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -52,9 +52,7 @@ class SchedulingAgent { /*! * Create a new scheduling agent. */ - SchedulingAgent( - std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, - std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) + SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) : configure_(configuration), admin_yield_duration_(0), bored_yield_duration_(0), @@ -62,8 +60,7 @@ class SchedulingAgent { logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()) { running_ = false; repo_ = repo; - utils::ThreadPool<bool> pool = utils::ThreadPool<bool>( - configure_->getInt(Configure::nifi_flow_engine_threads, 8), true); + utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true); component_lifecycle_thread_pool_ = std::move(pool); component_lifecycle_thread_pool_.start(); } @@ -72,9 +69,7 @@ class SchedulingAgent { } // onTrigger, return whether the yield is need - bool onTrigger(std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory); + bool onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); // Whether agent has work to do bool hasWorkToDo(std::shared_ptr<core::Processor> processor); // Whether the outgoing need to be backpressure @@ -91,10 +86,8 @@ class SchedulingAgent { } public: - virtual void enableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); - virtual void disableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual void disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); // schedule, overwritten by different DrivenSchedulingAgent virtual void schedule(std::shared_ptr<core::Processor> processor) = 0; // unschedule, overwritten by different DrivenSchedulingAgent http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index 6f5a462..209f6b4 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -131,9 +131,7 @@ typedef enum { } RequestType; // Request Type Str -static const char *RequestTypeStr[MAX_REQUEST_TYPE] = { - "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES", - "RECEIVE_FLOWFILES", "SHUTDOWN" }; +static const char *RequestTypeStr[MAX_REQUEST_TYPE] = { "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES", "RECEIVE_FLOWFILES", "SHUTDOWN" }; // Respond Code typedef enum { @@ -171,32 +169,33 @@ typedef enum { // Respond Code Class typedef struct { RespondCode code; - const char *description; - bool hasDescription; + const char *description;bool hasDescription; } RespondCodeContext; // Respond Code Context -static RespondCodeContext respondCodeContext[] = { { RESERVED, - "Reserved for Future Use", false }, - { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME, - "Unknown Property Name", true }, { ILLEGAL_PROPERTY_VALUE, - "Illegal Property Value", true }, { MISSING_PROPERTY, - "Missing Property", true }, { CONTINUE_TRANSACTION, - "Continue Transaction", false }, { FINISH_TRANSACTION, - "Finish Transaction", false }, { CONFIRM_TRANSACTION, - "Confirm Transaction", true }, { TRANSACTION_FINISHED, - "Transaction Finished", false }, { - TRANSACTION_FINISHED_BUT_DESTINATION_FULL, - "Transaction Finished But Destination is Full", false }, { - CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM, - "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false }, { - NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT, - "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE, - "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL, - "Port's Destination is Full", false }, { UNAUTHORIZED, - "User Not Authorized", true }, { ABORT, "Abort", true }, { - UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, { - END_OF_STREAM, "End of Stream", false } }; +static RespondCodeContext respondCodeContext[] = { + { RESERVED, "Reserved for Future Use", false }, + { PROPERTIES_OK, "Properties OK", false }, + { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true }, + { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, + { MISSING_PROPERTY, "Missing Property", true }, + { CONTINUE_TRANSACTION, "Continue Transaction", false }, + { FINISH_TRANSACTION, "Finish Transaction", false }, + { CONFIRM_TRANSACTION, "Confirm Transaction", true }, + { TRANSACTION_FINISHED, "Transaction Finished", false }, + { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false }, + { CANCEL_TRANSACTION, "Cancel Transaction", true }, + { BAD_CHECKSUM, "Bad Checksum", false }, + { MORE_DATA, "More Data Exists", false }, + { NO_MORE_DATA, "No More Data Exists", false }, + { UNKNOWN_PORT, "Unknown Port", false }, + { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true }, + { PORTS_DESTINATION_FULL, "Port's Destination is Full", false }, + { UNAUTHORIZED, "User Not Authorized", true }, + { ABORT, "Abort", true }, + { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, + { END_OF_STREAM, "End of Stream", false } +}; // Respond Code Sequence Pattern static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R'; @@ -246,54 +245,52 @@ typedef enum { // HandShakeProperty Str static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = { -/** - * Boolean value indicating whether or not the contents of a FlowFile should - * be GZipped when transferred. - */ -"GZIP", -/** - * The unique identifier of the port to communicate with - */ -"PORT_IDENTIFIER", -/** - * Indicates the number of milliseconds after the request was made that the - * client will wait for a response. If no response has been received by the - * time this value expires, the server can move on without attempting to - * service the request because the client will have already disconnected. - */ -"REQUEST_EXPIRATION_MILLIS", -/** - * The preferred number of FlowFiles that the server should send to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. - */ -"BATCH_COUNT", -/** - * The preferred number of bytes that the server should send to the client - * when pulling data. This property was introduced in version 5 of the - * protocol. - */ -"BATCH_SIZE", -/** - * The preferred amount of time that the server should send data to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. Value is in milliseconds. - */ -"BATCH_DURATION" }; + /** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ + "GZIP", + /** + * The unique identifier of the port to communicate with + */ + "PORT_IDENTIFIER", + /** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ + "REQUEST_EXPIRATION_MILLIS", + /** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ + "BATCH_COUNT", + /** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ + "BATCH_SIZE", + /** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ + "BATCH_DURATION" }; class Site2SiteClientProtocol; // Transaction Class class Transaction { friend class Site2SiteClientProtocol; - public: + public: // Constructor /*! * Create a new transaction */ - explicit Transaction( - TransferDirection direction, - org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &stream) + explicit Transaction(TransferDirection direction, org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &stream) : crcStream(std::move(stream)) { _state = TRANSACTION_STARTED; _direction = direction; @@ -375,9 +372,8 @@ class Transaction { */ class DataPacket { public: - DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, - std::map<std::string, std::string> attributes, std::string &payload) : - payload_ (payload) { + DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, std::map<std::string, std::string> attributes, std::string &payload) + : payload_(payload) { _protocol = protocol; _size = 0; _transaction = transaction; @@ -398,7 +394,8 @@ class Site2SiteClientProtocol { /*! * Create a new control protocol */ - Site2SiteClientProtocol(std::unique_ptr<Site2SitePeer> peer) : logger_(logging::LoggerFactory<Site2SiteClientProtocol>::getLogger()) { + Site2SiteClientProtocol(std::unique_ptr<Site2SitePeer> peer) + : logger_(logging::LoggerFactory<Site2SiteClientProtocol>::getLogger()) { peer_ = std::move(peer); _batchSize = 0; _batchCount = 0; @@ -500,8 +497,7 @@ class Site2SiteClientProtocol { int writeRespond(RespondCode code, std::string message); // getRespondCodeContext RespondCodeContext *getRespondCodeContext(RespondCode code) { - for (unsigned int i = 0; - i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) { + for (unsigned int i = 0; i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) { if (respondCodeContext[i].code == code) { return &respondCodeContext[i]; } @@ -511,16 +507,13 @@ class Site2SiteClientProtocol { // Creation of a new transaction, return the transaction ID if success, // Return NULL when any error occurs - Transaction *createTransaction(std::string &transactionID, - TransferDirection direction); + Transaction *createTransaction(std::string &transactionID, TransferDirection direction); // Receive the data packet from the transaction // Return false when any error occurs bool receive(std::string transactionID, DataPacket *packet, bool &eof); // Send the data packet from the transaction // Return false when any error occurs - bool send(std::string transactionID, DataPacket *packet, - std::shared_ptr<FlowFileRecord> flowFile, - core::ProcessSession *session); + bool send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session); // Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received. bool confirm(std::string transactionID); // Cancel the transaction @@ -530,14 +523,11 @@ class Site2SiteClientProtocol { // Error the transaction void error(std::string transactionID); // Receive flow files for the process session - void receiveFlowFiles(core::ProcessContext *context, - core::ProcessSession *session); + void receiveFlowFiles(core::ProcessContext *context, core::ProcessSession *session); // Transfer flow files for the process session - void transferFlowFiles(core::ProcessContext *context, - core::ProcessSession *session); + void transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session); //! Transfer string for the process session - void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, - std::map<std::string, std::string> attributes); + void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, std::map<std::string, std::string> attributes); // deleteTransaction void deleteTransaction(std::string transactionID); // Nest Callback Class for write stream @@ -554,8 +544,7 @@ class Site2SiteClientProtocol { int size = std::min(len, (int) sizeof(buffer)); int ret = _packet->_transaction->getStream().readData(buffer, size); if (ret != size) { - _packet->_protocol->logger_->log_error( - "Site2Site Receive Flow Size %d Failed %d", size, ret); + _packet->_protocol->logger_->log_error("Site2Site Receive Flow Size %d Failed %d", size, ret); break; } stream->write((const char *) buffer, size); @@ -579,11 +568,9 @@ class Site2SiteClientProtocol { readSize = stream->gcount(); else readSize = 8192; - int ret = _packet->_transaction->getStream().writeData(buffer, - readSize); + int ret = _packet->_transaction->getStream().writeData(buffer, readSize); if (ret != readSize) { - _packet->_protocol->logger_->log_error( - "Site2Site Send Flow Size %d Failed %d", readSize, ret); + _packet->_protocol->logger_->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret); break; } _packet->_size += readSize; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/Site2SitePeer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SitePeer.h b/libminifi/include/Site2SitePeer.h index ab8d09b..65a5479 100644 --- a/libminifi/include/Site2SitePeer.h +++ b/libminifi/include/Site2SitePeer.h @@ -58,9 +58,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { /* * Create a new site2site peer */ - explicit Site2SitePeer( - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, - const std::string host_, uint16_t port_) + explicit Site2SitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host_, uint16_t port_) : host_(host_), port_(port_), stream_(injected_socket.release()), @@ -147,8 +145,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { // whether need be to yield bool isYield(std::string portId) { std::lock_guard<std::mutex> lock(mutex_); - std::map<std::string, uint64_t>::iterator it = this - ->_yieldExpirationPortIdMap.find(portId); + std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId); if (it != _yieldExpirationPortIdMap.end()) { uint64_t yieldExpiration = it->second; return (yieldExpiration >= getTimeMillis()); @@ -159,8 +156,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { // clear yield expiration void clearYield(std::string portId) { std::lock_guard<std::mutex> lock(mutex_); - std::map<std::string, uint64_t>::iterator it = this - ->_yieldExpirationPortIdMap.find(portId); + std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId); if (it != _yieldExpirationPortIdMap.end()) { _yieldExpirationPortIdMap.erase(portId); } @@ -219,9 +215,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { return Serializable::read(value, stream_.get()); } int readUTF(std::string &str, bool widen = false) { - return org::apache::nifi::minifi::io::Serializable::readUTF(str, - stream_.get(), - widen); + return org::apache::nifi::minifi::io::Serializable::readUTF(str, stream_.get(), widen); } // open connection to the peer bool Open(); @@ -232,8 +226,7 @@ class Site2SitePeer : public org::apache::nifi::minifi::io::BaseStream { * Move assignment operator. */ Site2SitePeer& operator=(Site2SitePeer&& other) { - stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( - other.stream_.release()); + stream_ = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(other.stream_.release()); host_ = std::move(other.host_); port_ = std::move(other.port_); _yieldExpiration = 0; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index 21bbbd0..50ab6c9 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -42,20 +42,16 @@ class ThreadedSchedulingAgent : public SchedulingAgent { /*! * Create a new threaded scheduling agent. */ - ThreadedSchedulingAgent( - std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, - std::shared_ptr<core::Repository> repo, - std::shared_ptr<Configure> configuration) - : SchedulingAgent(controller_service_provider, repo, configuration), logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) { + ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) + : SchedulingAgent(controller_service_provider, repo, configuration), + logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) { } // Destructor virtual ~ThreadedSchedulingAgent() { } // Run function for the thread - virtual void run(std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory) = 0; + virtual void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0; public: // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 74096ee..597dc76 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -37,10 +37,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { /*! * Create a new processor */ - TimerDrivenSchedulingAgent( - std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, - std::shared_ptr<core::Repository> repo, - std::shared_ptr<Configure> configure) + TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) : ThreadedSchedulingAgent(controller_service_provider, repo, configure) { } // Destructor @@ -49,16 +46,13 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { /** * Run function that accepts the processor, context and session factory. */ - void run(std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory); + void run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); private: // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent); - TimerDrivenSchedulingAgent &operator=( - const TimerDrivenSchedulingAgent &parent); + TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent); }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/controllers/SSLContextService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/controllers/SSLContextService.h b/libminifi/include/controllers/SSLContextService.h index 7de26b4..9093d5f 100644 --- a/libminifi/include/controllers/SSLContextService.h +++ b/libminifi/include/controllers/SSLContextService.h @@ -63,13 +63,15 @@ class SSLContextService : public core::controller::ControllerService { : ControllerService(name, id), initialized_(false), valid_(false), - logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {} + logger_(logging::LoggerFactory<SSLContextService>::getLogger()) { + } explicit SSLContextService(const std::string &name, uuid_t uuid = 0) : ControllerService(name, uuid), initialized_(false), valid_(false), - logger_(logging::LoggerFactory<SSLContextService>::getLogger()) {} + logger_(logging::LoggerFactory<SSLContextService>::getLogger()) { + } virtual void initialize(); @@ -97,43 +99,35 @@ class SSLContextService : public core::controller::ControllerService { return false; } - bool configure_ssl_context(SSL_CTX *ctx) - { - if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) - <= 0) { - logger_->log_error("Could not create load certificate, error : %s", - std::strerror(errno)); - return false; - } - if (!IsNullOrEmpty(passphrase_)) { - SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_); - SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); - } - - int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), - SSL_FILETYPE_PEM); - if (retp != 1) { - logger_->log_error("Could not create load private key,%i on %s error : %s", - retp, private_key_, std::strerror(errno)); - return false; - } - - if (!SSL_CTX_check_private_key(ctx)) { - logger_->log_error( - "Private key does not match the public certificate, error : %s", - std::strerror(errno)); - return false; - } - - retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0); - if (retp == 0) { - logger_->log_error("Can not load CA certificate, Exiting, error : %s", - std::strerror(errno)); - return false; - } - - return true; - } + bool configure_ssl_context(SSL_CTX *ctx) { + if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) { + logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno)); + return false; + } + if (!IsNullOrEmpty(passphrase_)) { + SSL_CTX_set_default_passwd_cb_userdata(ctx, &passphrase_); + SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); + } + + int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM); + if (retp != 1) { + logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno)); + return false; + } + + if (!SSL_CTX_check_private_key(ctx)) { + logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno)); + return false; + } + + retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0); + if (retp == 0) { + logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno)); + return false; + } + + return true; + } protected: @@ -164,7 +158,7 @@ class SSLContextService : public core::controller::ControllerService { std::string ca_certificate_; private: - std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<logging::Logger> logger_; }; typedef int (SSLContextService::*ptr)(char *, int, int, void *); REGISTER_RESOURCE(SSLContextService); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ClassLoader.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h index 9f8fcae..31292b2 100644 --- a/libminifi/include/core/ClassLoader.h +++ b/libminifi/include/core/ClassLoader.h @@ -62,8 +62,7 @@ class ObjectFactory { /** * Create a shared pointer to a new processor. */ - virtual std::shared_ptr<Connectable> create(const std::string &name, - uuid_t uuid) { + virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) { return nullptr; } @@ -111,8 +110,7 @@ class DefautObjectFactory : public ObjectFactory { /** * Create a shared pointer to a new processor. */ - virtual std::shared_ptr<Connectable> create(const std::string &name, - uuid_t uuid) { + virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) { std::shared_ptr<T> ptr = std::make_shared<T>(name, uuid); return std::static_pointer_cast<Connectable>(ptr); } @@ -177,15 +175,13 @@ class ClassLoader { /** * Register a class with the give ProcessorFactory */ - void registerClass(const std::string &name, - std::unique_ptr<ObjectFactory> factory) { - if (loaded_factories_.find(name) != loaded_factories_.end()){ + void registerClass(const std::string &name, std::unique_ptr<ObjectFactory> factory) { + if (loaded_factories_.find(name) != loaded_factories_.end()) { return; } std::lock_guard<std::mutex> lock(internal_mutex_); - loaded_factories_.insert(std::make_pair(name, std::move(factory))); } @@ -196,8 +192,7 @@ class ClassLoader { * @return nullptr or object created from class_name definition. */ template<class T = Connectable> - std::shared_ptr<T> instantiate(const std::string &class_name, - const std::string &name); + std::shared_ptr<T> instantiate(const std::string &class_name, const std::string &name); /** * Instantiate object based on class_name @@ -217,12 +212,11 @@ class ClassLoader { std::vector<void *> dl_handles_; private: - std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<logging::Logger> logger_; }; template<class T> -std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, - const std::string &name) { +std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, const std::string &name) { std::lock_guard<std::mutex> lock(internal_mutex_); auto factory_entry = loaded_factories_.find(class_name); if (factory_entry != loaded_factories_.end()) { @@ -234,8 +228,7 @@ std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, } template<class T> -std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, - uuid_t uuid) { +std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, uuid_t uuid) { std::lock_guard<std::mutex> lock(internal_mutex_); auto factory_entry = loaded_factories_.find(class_name); if (factory_entry != loaded_factories_.end()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ConfigurationFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h index ed0bdb5..b58c170 100644 --- a/libminifi/include/core/ConfigurationFactory.h +++ b/libminifi/include/core/ConfigurationFactory.h @@ -29,19 +29,16 @@ namespace minifi { namespace core { template<typename T> -typename std::enable_if<!class_operations<T>::value, T*>::type instantiate( - const std::shared_ptr<core::Repository> &repo, - const std::shared_ptr<core::Repository> &flow_file_repo, - std::shared_ptr<Configure> configuration, const std::string path) { +typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo, + std::shared_ptr<Configure> configuration, + const std::string path) { throw std::runtime_error("Cannot instantiate class"); } template<typename T> -typename std::enable_if<class_operations<T>::value, T*>::type instantiate( - const std::shared_ptr<core::Repository> &repo, - const std::shared_ptr<core::Repository> &flow_file_repo, - const std::shared_ptr<io::StreamFactory> &stream_factory, - std::shared_ptr<Configure> configuration, const std::string path) { +typename std::enable_if<class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo, + const std::shared_ptr<io::StreamFactory> &stream_factory, + std::shared_ptr<Configure> configuration, const std::string path) { return new T(repo, flow_file_repo, stream_factory, configuration, path); } @@ -49,13 +46,10 @@ typename std::enable_if<class_operations<T>::value, T*>::type instantiate( * Configuration factory is used to create a new FlowConfiguration * object. */ -std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, - std::shared_ptr<Configure> configure, - std::shared_ptr<io::StreamFactory> stream_factory, - const std::string configuration_class_name, const std::string path = "", - bool fail_safe = false); +std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, + std::shared_ptr<io::StreamFactory> stream_factory, + const std::string configuration_class_name, const std::string path = "", + bool fail_safe = false); } /* namespace core */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/Connectable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h index 76536f4..150b5fc 100644 --- a/libminifi/include/core/Connectable.h +++ b/libminifi/include/core/Connectable.h @@ -68,8 +68,7 @@ class Connectable : public CoreComponent { * Get outgoing connection based on relationship * @return set of outgoing connections. */ - std::set<std::shared_ptr<Connectable>> getOutGoingConnections( - std::string relationship); + std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship); /** * Get next incoming connection @@ -147,7 +146,7 @@ class Connectable : public CoreComponent { // Incoming connections std::set<std::shared_ptr<Connectable>> _incomingConnections; // Outgoing connections map based on Relationship name - std::map<std::string, std::set<std::shared_ptr<Connectable>>> out_going_connections_; + std::map<std::string, std::set<std::shared_ptr<Connectable>>>out_going_connections_; // Mutex for protection std::mutex relationship_mutex_; @@ -163,7 +162,7 @@ class Connectable : public CoreComponent { // Concurrent condition variable for whether there is incoming work to do std::condition_variable work_condition_; - private: +private: std::shared_ptr<logging::Logger> logger_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index a9fb435..f100d8b 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -74,10 +74,9 @@ typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type i template<typename T> typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type instantiate(const std::string name = "") { - if (name.length() == 0){ + if (name.length() == 0) { return std::make_shared<T>(); - } - else{ + } else { return std::make_shared<T>(name); } } @@ -107,7 +106,6 @@ class CoreComponent { uuidStr_ = uuidStr; } - /** * Move Constructor. */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 1fee8c5..edcb2b6 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -58,9 +58,7 @@ class FlowConfiguration : public CoreComponent { * Constructor that will be used for configuring * the flow controller. */ - explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, - std::shared_ptr<io::StreamFactory> stream_factory, + explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<io::StreamFactory> stream_factory, std::shared_ptr<Configure> configuration, const std::string path) : CoreComponent(core::getClassName<FlowConfiguration>()), @@ -68,31 +66,23 @@ class FlowConfiguration : public CoreComponent { config_path_(path), stream_factory_(stream_factory), logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { - controller_services_ = std::make_shared< - core::controller::ControllerServiceMap>(); - service_provider_ = std::make_shared< - core::controller::StandardControllerServiceProvider>( - controller_services_, nullptr, configuration); + controller_services_ = std::make_shared<core::controller::ControllerServiceMap>(); + service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration); } virtual ~FlowConfiguration(); // Create Processor (Node/Input/Output Port) based on the name - std::shared_ptr<core::Processor> createProcessor(std::string name, - uuid_t uuid); + std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t uuid); // Create Root Processor Group - std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, - uuid_t uuid); + std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, uuid_t uuid); - std::shared_ptr<core::controller::ControllerServiceNode> createControllerService( - const std::string &class_name, const std::string &name, uuid_t uuid); + std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid); // Create Remote Processor Group - std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, - uuid_t uuid); + std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, uuid_t uuid); // Create Connection - std::shared_ptr<minifi::Connection> createConnection(std::string name, - uuid_t uuid); + std::shared_ptr<minifi::Connection> createConnection(std::string name, uuid_t uuid); // Create Provenance Report Task std::shared_ptr<core::Processor> createProvenanceReportTask(void); @@ -113,8 +103,7 @@ class FlowConfiguration : public CoreComponent { * @return Extensions should return a non-null pointer in order to * properly configure flow controller. */ - virtual std::unique_ptr<core::ProcessGroup> getRoot( - const std::string &from_config) { + virtual std::unique_ptr<core::ProcessGroup> getRoot(const std::string &from_config) { return nullptr; } @@ -134,7 +123,7 @@ class FlowConfiguration : public CoreComponent { std::shared_ptr<core::Repository> flow_file_repo_; // stream factory std::shared_ptr<io::StreamFactory> stream_factory_; - + private: std::shared_ptr<logging::Logger> logger_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/FlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h index 050d15f..a0b8dec 100644 --- a/libminifi/include/core/FlowFile.h +++ b/libminifi/include/core/FlowFile.h @@ -191,9 +191,7 @@ class FlowFile { // Check whether it is still being penalized bool isPenalized() { - return ( - penaltyExpiration_ms_ > 0 ? - penaltyExpiration_ms_ > getTimeMillis() : false); + return (penaltyExpiration_ms_ > 0 ? penaltyExpiration_ms_ > getTimeMillis() : false); } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index 8094e05..48e0108 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -46,12 +46,10 @@ class ProcessContext : public controller::ControllerServiceLookup { /*! * Create a new process context associated with the processor/controller service/state manager */ - ProcessContext( - ProcessorNode &processor, - std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, - std::shared_ptr<core::Repository> repo) + ProcessContext(ProcessorNode &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, std::shared_ptr<core::Repository> repo) : processor_node_(processor), - controller_service_provider_(controller_service_provider), logger_(logging::LoggerFactory<ProcessContext>::getLogger()) { + controller_service_provider_(controller_service_provider), + logger_(logging::LoggerFactory<ProcessContext>::getLogger()) { repo_ = repo; } // Destructor @@ -106,10 +104,8 @@ class ProcessContext : public controller::ControllerServiceLookup { * @return the ControllerService that is registered with the given * identifier */ - std::shared_ptr<core::controller::ControllerService> getControllerService( - const std::string &identifier) { - return controller_service_provider_->getControllerServiceForComponent( - identifier, processor_node_.getUUIDStr()); + std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) { + return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_.getUUIDStr()); } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index ccf744e..f54f5b4 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -55,8 +55,7 @@ class ProcessGroup { /*! * Create a new process group */ - ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, - ProcessGroup *parent = NULL); + ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, ProcessGroup *parent = NULL); // Destructor virtual ~ProcessGroup(); // Set Processor Name @@ -111,11 +110,9 @@ class ProcessGroup { return false; } // Start Processing - void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler); + void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler); // Stop Processing - void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, - EventDrivenSchedulingAgent *eventScheduler); + void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler); // Whether it is root process group bool isRootProcessGroup(); // set parent process group @@ -147,26 +144,21 @@ class ProcessGroup { * @param nodeId node identifier * @param node controller service node. */ - void addControllerService( - const std::string &nodeId, - std::shared_ptr<core::controller::ControllerServiceNode> &node); + void addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node); /** * Find controllerservice node will search child groups until the nodeId is found. * @param node node identifier * @return controller service node, if it exists. */ - std::shared_ptr<core::controller::ControllerServiceNode> findControllerService( - const std::string &nodeId); + std::shared_ptr<core::controller::ControllerServiceNode> findControllerService(const std::string &nodeId); // removeConnection void removeConnection(std::shared_ptr<Connection> connection); // update property value - void updatePropertyValue(std::string processorName, std::string propertyName, - std::string propertyValue); + void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue); - void getConnections( - std::map<std::string, std::shared_ptr<Connection>> &connectionMap); + void getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap); protected: // A global unique identifier http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 1f6f234..4d20f59 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -48,13 +48,11 @@ class ProcessSession { * Create a new process session */ ProcessSession(ProcessContext *processContext = NULL) - : process_context_(processContext), logger_(logging::LoggerFactory<ProcessSession>::getLogger()) { - logger_->log_trace("ProcessSession created for %s", - process_context_->getProcessorNode().getName().c_str()); + : process_context_(processContext), + logger_(logging::LoggerFactory<ProcessSession>::getLogger()) { + logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode().getName().c_str()); auto repo = processContext->getProvenanceRepository(); - provenance_report_ = new provenance::ProvenanceReporter( - repo, process_context_->getProcessorNode().getUUIDStr(), - process_context_->getProcessorNode().getName()); + provenance_report_ = new provenance::ProvenanceReporter(repo, process_context_->getProcessorNode().getUUIDStr(), process_context_->getProcessorNode().getName()); } // Destructor @@ -64,9 +62,9 @@ class ProcessSession { } // Commit the session void commit(); -// Roll Back the session + // Roll Back the session void rollback(); -// Get Provenance Report + // Get Provenance Report provenance::ProvenanceReporter *getProvenanceReporter() { return provenance_report_; } @@ -76,54 +74,39 @@ class ProcessSession { // Create a new UUID FlowFile with no content resource claim and without parent std::shared_ptr<core::FlowFile> create(); // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent - std::shared_ptr<core::FlowFile> create( - std::shared_ptr<core::FlowFile> &&parent); + std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &&parent); // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent - std::shared_ptr<core::FlowFile> create( - std::shared_ptr<core::FlowFile> &parent) { + std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> &parent) { return create(parent); } // Clone a new UUID FlowFile from parent both for content resource claim and attributes - std::shared_ptr<core::FlowFile> clone( - std::shared_ptr<core::FlowFile> &parent); -// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim - std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent, - int64_t offset, int64_t size); -// Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session - std::shared_ptr<core::FlowFile> duplicate( - std::shared_ptr<core::FlowFile> &original); -// Transfer the FlowFile to the relationship - void transfer(std::shared_ptr<core::FlowFile> &flow, - Relationship relationship); - void transfer(std::shared_ptr<core::FlowFile> &&flow, - Relationship relationship); -// Put Attribute - void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, - std::string value); - void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, - std::string value); -// Remove Attribute + std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent); + // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim + std::shared_ptr<core::FlowFile> clone(std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size); + // Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session + std::shared_ptr<core::FlowFile> duplicate(std::shared_ptr<core::FlowFile> &original); + // Transfer the FlowFile to the relationship + void transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship); + void transfer(std::shared_ptr<core::FlowFile> &&flow, Relationship relationship); + // Put Attribute + void putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value); + void putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value); + // Remove Attribute void removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key); void removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key); -// Remove Flow File + // Remove Flow File void remove(std::shared_ptr<core::FlowFile> &flow); void remove(std::shared_ptr<core::FlowFile> &&flow); -// Execute the given read callback against the content - void read(std::shared_ptr<core::FlowFile> &flow, - InputStreamCallback *callback); - void read(std::shared_ptr<core::FlowFile> &&flow, - InputStreamCallback *callback); -// Execute the given write callback against the content - void write(std::shared_ptr<core::FlowFile> &flow, - OutputStreamCallback *callback); - void write(std::shared_ptr<core::FlowFile> &&flow, - OutputStreamCallback *callback); -// Execute the given write/append callback against the content - void append(std::shared_ptr<core::FlowFile> &flow, - OutputStreamCallback *callback); - void append(std::shared_ptr<core::FlowFile> &&flow, - OutputStreamCallback *callback); -// Penalize the flow + // Execute the given read callback against the content + void read(std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback); + void read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCallback *callback); + // Execute the given write callback against the content + void write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback); + void write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback); + // Execute the given write/append callback against the content + void append(std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback); + void append(std::shared_ptr<core::FlowFile> &&flow, OutputStreamCallback *callback); + // Penalize the flow void penalize(std::shared_ptr<core::FlowFile> &flow); void penalize(std::shared_ptr<core::FlowFile> &&flow); @@ -132,13 +115,14 @@ class ProcessSession { * @param stream incoming data stream that contains the data to store into a file * @param flow flow file */ - void importFrom(io::DataStream &stream, - std::shared_ptr<core::FlowFile> &&flow); + void importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> &&flow); // import from the data source. void import(std::string source, std::shared_ptr<core::FlowFile> &flow, - bool keepSource = true, uint64_t offset = 0); + bool keepSource = true, + uint64_t offset = 0); void import(std::string source, std::shared_ptr<core::FlowFile> &&flow, - bool keepSource = true, uint64_t offset = 0); + bool keepSource = true, + uint64_t offset = 0); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer @@ -148,26 +132,25 @@ class ProcessSession { protected: // FlowFiles being modified by current process session std::map<std::string, std::shared_ptr<core::FlowFile> > _updatedFlowFiles; -// Copy of the original FlowFiles being modified by current process session as above + // Copy of the original FlowFiles being modified by current process session as above std::map<std::string, std::shared_ptr<core::FlowFile> > _originalFlowFiles; -// FlowFiles being added by current process session + // FlowFiles being added by current process session std::map<std::string, std::shared_ptr<core::FlowFile> > _addedFlowFiles; -// FlowFiles being deleted by current process session + // FlowFiles being deleted by current process session std::map<std::string, std::shared_ptr<core::FlowFile> > _deletedFlowFiles; -// FlowFiles being transfered to the relationship + // FlowFiles being transfered to the relationship std::map<std::string, Relationship> _transferRelationship; -// FlowFiles being cloned for multiple connections per relationship + // FlowFiles being cloned for multiple connections per relationship std::map<std::string, std::shared_ptr<core::FlowFile> > _clonedFlowFiles; private: // Clone the flow file during transfer to multiple connections for a relationship - std::shared_ptr<core::FlowFile> cloneDuringTransfer( - std::shared_ptr<core::FlowFile> &parent); -// ProcessContext + std::shared_ptr<core::FlowFile> cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent); + // ProcessContext ProcessContext *process_context_; -// Logger + // Logger std::shared_ptr<logging::Logger> logger_; -// Provenance Report + // Provenance Report provenance::ProvenanceReporter *provenance_report_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 9a19072..251ec47 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -62,8 +62,7 @@ namespace core { #define DEFAULT_PENALIZATION_PERIOD_SECONDS 30 // Processor Class -class Processor : public Connectable, public ConfigurableComponent, - public std::enable_shared_from_this<Processor> { +class Processor : public Connectable, public ConfigurableComponent, public std::enable_shared_from_this<Processor> { public: // Constructor @@ -192,8 +191,7 @@ class Processor : public Connectable, public ConfigurableComponent, bool flowFilesOutGoingFull(); // Get outgoing connections based on relationship name - std::set<std::shared_ptr<Connection> > getOutGoingConnections( - std::string relationship); + std::set<std::shared_ptr<Connection> > getOutGoingConnections(std::string relationship); // Add connection bool addConnection(std::shared_ptr<Connectable> connection); // Remove connection @@ -205,8 +203,7 @@ class Processor : public Connectable, public ConfigurableComponent, // Get the Next RoundRobin incoming connection std::shared_ptr<Connection> getNextIncomingConnection(); // On Trigger - void onTrigger(ProcessContext *context, - ProcessSessionFactory *sessionFactory); + void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory); virtual bool canEdit() { return !isRunning(); @@ -220,8 +217,7 @@ class Processor : public Connectable, public ConfigurableComponent, virtual void initialize() { } // Scheduled event hook, overridden by NiFi Process Designer - virtual void onSchedule(ProcessContext *context, - ProcessSessionFactory *sessionFactory) { + virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) { } protected: @@ -243,7 +239,7 @@ class Processor : public Connectable, public ConfigurableComponent, // Trigger the Processor even if the incoming connection is empty std::atomic<bool> _triggerWhenEmpty; - private: + private: // Mutex for protection std::mutex mutex_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/include/core/ProcessorNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h index 8836f62..9bf1f52 100644 --- a/libminifi/include/core/ProcessorNode.h +++ b/libminifi/include/core/ProcessorNode.h @@ -60,8 +60,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * @return result of getting property. */ bool getProperty(const std::string name, std::string &value) { - const std::shared_ptr<ConfigurableComponent> processor_cast = - std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_); if (nullptr != processor_cast) return processor_cast->getProperty(name, value); else { @@ -75,8 +74,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * @return result of setting property. */ bool setProperty(const std::string name, std::string value) { - const std::shared_ptr<ConfigurableComponent> processor_cast = - std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_); bool ret = ConfigurableComponent::setProperty(name, value); if (nullptr != processor_cast) ret = processor_cast->setProperty(name, value); @@ -92,8 +90,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * @return whether property was set or not */ bool setProperty(Property &prop, std::string value) { - const std::shared_ptr<ConfigurableComponent> processor_cast = - std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_); bool ret = ConfigurableComponent::setProperty(prop, value); if (nullptr != processor_cast) ret = processor_cast->setProperty(prop, value); @@ -107,8 +104,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * @return result of set operation. */ bool setSupportedProperties(std::set<Property> properties) { - const std::shared_ptr<ConfigurableComponent> processor_cast = - std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + const std::shared_ptr<ConfigurableComponent> processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_); bool ret = ConfigurableComponent::setSupportedProperties(properties); if (nullptr != processor_cast) ret = processor_cast->setSupportedProperties(properties); @@ -164,8 +160,7 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { * Get outgoing connection based on relationship * @return set of outgoing connections. */ - std::set<std::shared_ptr<Connectable>> getOutGoingConnections( - std::string relationship) { + std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship) { return processor_->getOutGoingConnections(relationship); }
