http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/ControllerServiceProvider.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h new file mode 100644 index 0000000..a749475 --- /dev/null +++ b/libminifi/include/core/controller/ControllerServiceProvider.h @@ -0,0 +1,306 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ + +#include <vector> +#include "core/Core.h" +#include "ControllerServiceLookup.h" +#include "core/ConfigurableComponent.h" +#include "ControllerServiceNode.h" +#include "ControllerServiceMap.h" +#include "core/ClassLoader.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +class ControllerServiceProvider : public CoreComponent, + public ConfigurableComponent, public ControllerServiceLookup { + public: + + explicit ControllerServiceProvider(const std::string &name) + : CoreComponent(name), + ConfigurableComponent(logging::Logger::getLogger()) { + controller_map_ = std::make_shared<ControllerServiceMap>(); + } + + explicit ControllerServiceProvider( + std::shared_ptr<ControllerServiceMap> services) + : CoreComponent(core::getClassName<ControllerServiceProvider>()), + ConfigurableComponent(logging::Logger::getLogger()), + controller_map_(services) { + } + + explicit ControllerServiceProvider( + const std::string &name, std::shared_ptr<ControllerServiceMap> services) + : CoreComponent(name), + ConfigurableComponent(logging::Logger::getLogger()), + controller_map_(services) { + } + + explicit ControllerServiceProvider(const ControllerServiceProvider &&other) + : CoreComponent(std::move(other)), + ConfigurableComponent(std::move(other)), + controller_map_(std::move(other.controller_map_)) { + } + + virtual ~ControllerServiceProvider() { + } + + /** + * Creates a controller service node wrapping the controller service + * + * @param type service type. + * @param id controller service identifier. + * @return shared pointer to the controller service node. + */ + virtual std::shared_ptr<ControllerServiceNode> createControllerService( + const std::string &type, const std::string &id, + bool firstTimeAdded) = 0; + /** + * Gets a controller service node wrapping the controller service + * + * @param type service type. + * @param id controller service identifier. + * @return shared pointer to the controller service node. + */ + virtual std::shared_ptr<ControllerServiceNode> getControllerServiceNode( + const std::string &id) { + return controller_map_->getControllerServiceNode(id); + } + + /** + * Removes a controller service. + * @param serviceNode controller service node. + */ + virtual void removeControllerService( + const std::shared_ptr<ControllerServiceNode> &serviceNode) { + controller_map_->removeControllerService(serviceNode); + } + + /** + * Enables the provided controller service + * @param serviceNode controller service node. + */ + virtual void enableControllerService( + std::shared_ptr<ControllerServiceNode> &serviceNode) = 0; + + /** + * Enables the provided controller service nodes + * @param serviceNode controller service node. + */ + virtual void enableControllerServices( + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) = 0; + + /** + * Disables the provided controller service node + * @param serviceNode controller service node. + */ + virtual void disableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; + + /** + * Gets a list of all controller services. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices() { + return controller_map_->getAllControllerServices(); + } + + /** + * Verifies that referencing components can be stopped for the controller service + */ + virtual void verifyCanStopReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; + + /** + * Unschedules referencing components. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; + + /** + * Verifies referencing components for <code>serviceNode</code> can be disabled. + * @param serviceNode shared pointer to a controller service node. + */ + virtual void verifyCanDisableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; + + /** + * Disables referencing components for <code>serviceNode</code> can be disabled. + * @param serviceNode shared pointer to a controller service node. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>(); + } + + /** + * Verifies referencing components for <code>serviceNode</code> can be enabled. + * @param serviceNode shared pointer to a controller service node. + */ + virtual void verifyCanEnableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = + findLinkedComponents(serviceNode); + for (auto ref : references) { + ref->canEnable(); + } + } + + /** + * Enables referencing components for <code>serviceNode</code> can be Enabled. + * @param serviceNode shared pointer to a controller service node. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; + + /** + * Schedules the service node and referencing components. + * @param serviceNode shared pointer to a controller service node. + */ + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; + + /** + * Returns a controller service for the service identifier and componentID + * @param service Identifier service identifier. + */ + virtual std::shared_ptr<ControllerService> getControllerServiceForComponent( + const std::string &serviceIdentifier, const std::string &componentId) { + std::shared_ptr<ControllerService> node = getControllerService( + serviceIdentifier); + return node; + } + + /** + * Gets the controller service for the provided identifier + * @param identifier service identifier. + */ + virtual std::shared_ptr<ControllerService> getControllerService( + const std::string &identifier); + + /** + * Determines if Controller service is enabled. + * @param identifier service identifier. + */ + virtual bool isControllerServiceEnabled(const std::string &identifier) { + std::shared_ptr<ControllerServiceNode> node = getControllerServiceNode( + identifier); + if (nullptr != node) { + return linkedServicesAre(ENABLED, node); + } else + return false; + } + + /** + * Determines if Controller service is being enabled. + * @param identifier service identifier. + */ + virtual bool isControllerServiceEnabling(const std::string &identifier) { + std::shared_ptr<ControllerServiceNode> node = getControllerServiceNode( + identifier); + if (nullptr != node) { + return linkedServicesAre(ENABLING, node); + } else + return false; + } + + virtual const std::string getControllerServiceName( + const std::string &identifier) { + std::shared_ptr<ControllerService> node = getControllerService(identifier); + if (nullptr != node) { + return node->getName(); + } else + return ""; + } + + virtual void enableAllControllerServices() = 0; + + protected: + + /** + * verifies that linked services match the provided state. + */ + inline bool linkedServicesAre( + ControllerServiceState state, + const std::shared_ptr<ControllerServiceNode> &node) { + if (node->getControllerServiceImplementation()->getState() == state) { + for (auto child_service : node->getLinkedControllerServices()) { + if (child_service->getControllerServiceImplementation()->getState() + != state) { + return false; + } + } + return true; + } else { + return false; + } + } + + bool canEdit() { + return true; + } + + /** + * Finds linked components + * @param referenceNode reference node from whcih we will find linked references. + */ + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> findLinkedComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &referenceNode) { + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references; + + for (std::shared_ptr<core::controller::ControllerServiceNode> linked_node : referenceNode + ->getLinkedControllerServices()) { + references.push_back(linked_node); + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> linked_references = + findLinkedComponents(linked_node); + + auto removal_predicate = + [&linked_references](std::shared_ptr<core::controller::ControllerServiceNode> key) ->bool + { + return std::find(linked_references.begin(), linked_references.end(), key) != linked_references.end(); + }; + + references.erase( + std::remove_if(references.begin(), references.end(), + removal_predicate), + references.end()); + + references.insert(std::end(references), linked_references.begin(), + linked_references.end()); + } + return references; + } + + std::shared_ptr<ControllerServiceMap> controller_map_; + +}; + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/StandardControllerServiceNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/StandardControllerServiceNode.h b/libminifi/include/core/controller/StandardControllerServiceNode.h new file mode 100644 index 0000000..f599217 --- /dev/null +++ b/libminifi/include/core/controller/StandardControllerServiceNode.h @@ -0,0 +1,107 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICENODE_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICENODE_H_ + +#include "core/Core.h" +#include "ControllerServiceNode.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +class StandardControllerServiceNode : public ControllerServiceNode { + public: + + explicit StandardControllerServiceNode( + std::shared_ptr<ControllerService> service, + std::shared_ptr<ControllerServiceProvider> provider, + const std::string &id, std::shared_ptr<Configure> configuration) + : ControllerServiceNode(service, id, configuration), + provider(provider) { + } + + explicit StandardControllerServiceNode( + std::shared_ptr<ControllerService> service, const std::string &id, + std::shared_ptr<Configure> configuration) + : ControllerServiceNode(service, id, configuration), + provider(nullptr) { + } + + std::shared_ptr<core::ProcessGroup> &getProcessGroup(); + + void setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup); + + StandardControllerServiceNode(const StandardControllerServiceNode &other) = delete; + StandardControllerServiceNode &operator=( + const StandardControllerServiceNode &parent) = delete; + + /** + * Initializes the controller service node. + */ + virtual void initialize() { + ControllerServiceNode::initialize(); + active = false; + } + + bool canEnable() { + if (!active.load()) { + for (auto linked_service : linked_controller_services_) { + if (!linked_service->canEnable()) { + return false; + } + } + return true; + } else { + return false; + } + } + + bool enable(); + + bool disable() { + controller_service_->setState(DISABLED); + active = false; + return true; + } + + protected: + + // controller service provider. + std::shared_ptr<ControllerServiceProvider> provider; + + // process group. + std::shared_ptr<core::ProcessGroup> process_group_; + + std::mutex mutex_; + +}; + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICENODE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/controller/StandardControllerServiceProvider.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h new file mode 100644 index 0000000..ba8af56 --- /dev/null +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -0,0 +1,229 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDStandardControllerServiceProvider_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDStandardControllerServiceProvider_H_ + +#include <iostream> +#include <memory> +#include <vector> +#include "core/ProcessGroup.h" +#include "SchedulingAgent.h" +#include "core/ClassLoader.h" +#include "ControllerService.h" +#include "ControllerServiceMap.h" +#include "ControllerServiceNode.h" +#include "StandardControllerServiceNode.h" +#include "ControllerServiceProvider.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace controller { + +class StandardControllerServiceProvider : public ControllerServiceProvider, + public std::enable_shared_from_this<StandardControllerServiceProvider> { + public: + + explicit StandardControllerServiceProvider( + std::shared_ptr<ControllerServiceMap> services, + std::shared_ptr<ProcessGroup> root_group, std::shared_ptr<Configure> configuration, + std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader = + ClassLoader::getDefaultClassLoader()) + : ControllerServiceProvider(services), + root_group_(root_group), + agent_(agent), + extension_loader_(loader), + configuration_(configuration) { + } + + explicit StandardControllerServiceProvider( + std::shared_ptr<ControllerServiceMap> services, + std::shared_ptr<ProcessGroup> root_group, std::shared_ptr<Configure> configuration, + ClassLoader &loader = ClassLoader::getDefaultClassLoader()) + : ControllerServiceProvider(services), + root_group_(root_group), + agent_(0), + extension_loader_(loader), + configuration_(configuration) { + } + + explicit StandardControllerServiceProvider( + const StandardControllerServiceProvider && other) + : ControllerServiceProvider(std::move(other)), + root_group_(std::move(other.root_group_)), + agent_(std::move(other.agent_)), + extension_loader_(other.extension_loader_), + configuration_(other.configuration_) { + + } + + void setRootGroup(std::shared_ptr<ProcessGroup> rg) { + root_group_ = rg; + } + + void setSchedulingAgent(std::shared_ptr<minifi::SchedulingAgent> agent) { + agent_ = agent; + } + + std::shared_ptr<ControllerServiceNode> createControllerService( + const std::string &type, const std::string &id, + bool firstTimeAdded) { + + std::shared_ptr<ControllerService> new_controller_service = + extension_loader_.instantiate<ControllerService>(type, id); + + if (nullptr == new_controller_service) { + return nullptr; + } + + std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared< + StandardControllerServiceNode>( + new_controller_service, + std::static_pointer_cast<ControllerServiceProvider>(shared_from_this()), + id, configuration_); + + controller_map_->put(id, new_service_node); + return new_service_node; + + } + + + void enableControllerService( + std::shared_ptr<ControllerServiceNode> &serviceNode) { + if (serviceNode->canEnable()) { + agent_->enableControllerService(serviceNode); + } + } + + + virtual void enableAllControllerServices() { + logger_->log_info("Enabling %d controller services", + controller_map_->getAllControllerServices().size()); + for (auto service : controller_map_->getAllControllerServices()) { + + if (service->canEnable()) { + logger_->log_info("Enabling %s", service->getName()); + agent_->enableControllerService(service); + } else { + logger_->log_info("Could not enable %s", service->getName()); + } + } + } + + + void enableControllerServices( + std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) { + for (auto node : serviceNodes) { + enableControllerService(node); + } + } + + + void disableControllerService( + std::shared_ptr<ControllerServiceNode> &serviceNode) { + if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) { + agent_->disableControllerService(serviceNode); + } + } + + + void verifyCanStopReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + } + + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = + findLinkedComponents(serviceNode); + for (auto ref : references) { + agent_->disableControllerService(ref); + } + return references; + } + + + void verifyCanDisableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = + findLinkedComponents(serviceNode); + for (auto ref : references) { + if (!ref->canEnable()) { + logger_->log_info("Cannot disable %s", ref->getName()); + } + } + } + + + virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = + findLinkedComponents(serviceNode); + for (auto ref : references) { + agent_->disableControllerService(ref); + } + + return references; + } + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = + findLinkedComponents(serviceNode); + for (auto ref : references) { + agent_->enableControllerService(ref); + } + return references; + } + + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = + findLinkedComponents(serviceNode); + for (auto ref : references) { + agent_->enableControllerService(ref); + } + return references; + } + + protected: + + bool canEdit() { + return false; + } + + std::shared_ptr<minifi::SchedulingAgent> agent_; + + ClassLoader &extension_loader_; + + std::shared_ptr<Configure> configuration_; + + std::shared_ptr<ProcessGroup> root_group_; + +}; + +} /* namespace controller */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDStandardControllerServiceProvider_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index 793cdb9..6226a4e 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -20,6 +20,7 @@ #include "core/ProcessorConfig.h" #include "yaml-cpp/yaml.h" +#include "processors/LoadProcessors.h" #include "../FlowConfiguration.h" #include "Site2SiteClientProtocol.h" #include <string> @@ -36,18 +37,21 @@ namespace core { #define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller" #define CONFIG_YAML_PROCESSORS_KEY "Processors" #define CONFIG_YAML_CONNECTIONS_KEY "Connections" +#define CONFIG_YAML_CONTROLLER_SERVICES_KEY "Controller Services" #define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY "Remote Processing Groups" #define CONFIG_YAML_PROVENANCE_REPORT_KEY "Provenance Reporting" class YamlConfiguration : public FlowConfiguration { public: - YamlConfiguration(std::shared_ptr<core::Repository> repo, + explicit YamlConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<io::StreamFactory> stream_factory, + std::shared_ptr<Configure> configuration, const std::string path = DEFAULT_FLOW_YAML_FILE_NAME) - : FlowConfiguration(repo, flow_file_repo, stream_factory, path) { - stream_factory_ = stream_factory; + : FlowConfiguration(repo, flow_file_repo, stream_factory, configuration, + path) { + stream_factory_ = stream_factory; if (IsNullOrEmpty(config_path_)) { config_path_ = DEFAULT_FLOW_YAML_FILE_NAME; } @@ -67,7 +71,8 @@ class YamlConfiguration : public FlowConfiguration { * @return the root ProcessGroup node of the flow * configuration tree */ - std::unique_ptr<core::ProcessGroup> getRoot(const std::string &yamlConfigStr) { + std::unique_ptr<core::ProcessGroup> getRoot( + const std::string &yamlConfigStr) { YAML::Node rootYamlNode = YAML::LoadFile(yamlConfigStr); return getRoot(&rootYamlNode); } @@ -106,9 +111,14 @@ class YamlConfiguration : public FlowConfiguration { YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY]; YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY]; YAML::Node connectionsNode = rootYaml[CONFIG_YAML_CONNECTIONS_KEY]; - YAML::Node remoteProcessingGroupsNode = rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY]; - YAML::Node provenanceReportNode = rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY]; + YAML::Node controllerServiceNode = + rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY]; + YAML::Node remoteProcessingGroupsNode = + rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY]; + YAML::Node provenanceReportNode = + rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY]; + parseControllerServices(&controllerServiceNode); // Create the root process group core::ProcessGroup * root = parseRootProcessGroupYaml(flowControllerNode); parseProcessorNodeYaml(processorsNode, root); @@ -116,6 +126,15 @@ class YamlConfiguration : public FlowConfiguration { parseConnectionYaml(&connectionsNode, root); parseProvenanceReportingYaml(&provenanceReportNode, root); + // set the controller services into the root group. + for (auto controller_service : controller_services_ + ->getAllControllerServices()) { + root->addControllerService(controller_service->getName(), + controller_service); + root->addControllerService(controller_service->getUUIDStr(), + controller_service); + } + return std::unique_ptr<core::ProcessGroup>(root); } @@ -147,7 +166,6 @@ class YamlConfiguration : public FlowConfiguration { void parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, TransferDirection direction); - /** * Parses the root level YAML node for the flow configuration and * returns a ProcessGroup containing the tree of flow configuration @@ -158,6 +176,17 @@ class YamlConfiguration : public FlowConfiguration { */ core::ProcessGroup *parseRootProcessGroupYaml(YAML::Node rootNode); + // Process Property YAML + void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, + std::shared_ptr<core::Processor> processor); + /** + * Parse controller services + * @param controllerServicesNode controller services YAML node. + * @param parent parent process group. + */ + void parseControllerServices(YAML::Node *controllerServicesNode); + // Process connection YAML + /** * Parses the Connections section of a configuration YAML. * The resulting Connections are added to the parent ProcessGroup. @@ -200,8 +229,9 @@ class YamlConfiguration : public FlowConfiguration { * @param propertiesNode the YAML::Node containing the properties * @param processor the Processor to which to add the resulting properties */ - void parsePropertiesNodeYaml(YAML::Node *propertiesNode, - std::shared_ptr<core::Processor> processor); + void parsePropertiesNodeYaml( + YAML::Node *propertiesNode, + std::shared_ptr<core::ConfigurableComponent> processor); /** * A helper function for parsing or generating optional id fields. @@ -219,7 +249,8 @@ class YamlConfiguration : public FlowConfiguration { * is optional and defaults to 'id' * @return the parsed or generated UUID string */ - std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField = "id"); + std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField = + "id"); /** * This is a helper function for verifying the existence of a required @@ -239,8 +270,7 @@ class YamlConfiguration : public FlowConfiguration { * @throws std::invalid_argument if the required field 'fieldName' is * not present in 'yamlNode' */ - void checkRequiredField(YAML::Node *yamlNode, - const std::string &fieldName, + void checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName, const std::string &yamlSection = "", const std::string &errorMessage = ""); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/io/StreamFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/StreamFactory.h b/libminifi/include/io/StreamFactory.h index db4625a..3faee45 100644 --- a/libminifi/include/io/StreamFactory.h +++ b/libminifi/include/io/StreamFactory.h @@ -30,7 +30,11 @@ namespace io { class AbstractStreamFactory { public: - virtual std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) = 0; + virtual ~AbstractStreamFactory() { + } + + virtual std::unique_ptr<Socket> createSocket(const std::string &host, + const uint16_t port) = 0; }; /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/io/TLSSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/TLSSocket.h b/libminifi/include/io/TLSSocket.h new file mode 100644 index 0000000..011a012 --- /dev/null +++ b/libminifi/include/io/TLSSocket.h @@ -0,0 +1,197 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ +#define LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ + +#include <cstdint> +#include "ClientSocket.h" +#include <atomic> +#include <mutex> + +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +#include <openssl/ssl.h> +#include <openssl/err.h> + +#define TLS_ERROR_CONTEXT 1 +#define TLS_ERROR_PEM_MISSING 2 +#define TLS_ERROR_CERT_MISSING 3 +#define TLS_ERROR_KEY_ERROR 4 +#define TLS_ERROR_CERT_ERROR 5 + +class TLSContext { + + public: + + /** + * Build an instance, creating a memory fence, which + * allows us to avoid locking. This is tantamount to double checked locking. + * @returns new TLSContext; + */ + static TLSContext *getInstance() { + TLSContext* atomic_context = context_instance.load( + std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (atomic_context == nullptr) { + std::lock_guard<std::mutex> lock(context_mutex); + atomic_context = context_instance.load(std::memory_order_relaxed); + if (atomic_context == nullptr) { + atomic_context = new TLSContext(); + atomic_context->initialize(); + std::atomic_thread_fence(std::memory_order_release); + context_instance.store(atomic_context, std::memory_order_relaxed); + } + } + return atomic_context; + } + + virtual ~TLSContext() { + if (0 != ctx) + SSL_CTX_free(ctx); + } + + SSL_CTX *getContext() { + return ctx; + } + + short getError() { + return error_value; + } + + short initialize(); + + private: + + static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { + std::string passphrase; + + if (Configure::getConfigure()->get( + Configure::nifi_security_client_pass_phrase, passphrase)) { + + std::ifstream file(passphrase.c_str(), std::ifstream::in); + if (!file.good()) { + memset(buf, 0x00, size); + return 0; + } + + std::string password; + password.assign((std::istreambuf_iterator<char>(file)), + std::istreambuf_iterator<char>()); + file.close(); + memset(buf, 0x00, size); + memcpy(buf, password.c_str(), password.length() - 1); + + return password.length() - 1; + } + return 0; + } + + TLSContext(); + + std::shared_ptr<logging::Logger> logger_; + Configure *configuration; + SSL_CTX *ctx; + + short error_value; + + static std::atomic<TLSContext*> context_instance; + static std::mutex context_mutex; +}; + +class TLSSocket : public Socket { + public: + + /** + * Constructor that accepts host name, port and listeners. With this + * contructor we will be creating a server socket + * @param hostname our host name + * @param port connecting port + * @param listeners number of listeners in the queue + */ + explicit TLSSocket(const std::string &hostname, const uint16_t port, + const uint16_t listeners); + + /** + * Constructor that creates a client socket. + * @param hostname hostname we are connecting to. + * @param port port we are connecting to. + */ + explicit TLSSocket(const std::string &hostname, const uint16_t port); + + /** + * Move constructor. + */ + explicit TLSSocket(const TLSSocket &&); + + virtual ~TLSSocket(); + + /** + * Initializes the socket + * @return result of the creation operation. + */ + short initialize(); + + /** + * Attempt to select the socket file descriptor + * @param msec timeout interval to wait + * @returns file descriptor + */ + virtual short select_descriptor(const uint16_t msec); + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * Write value to the stream using uint8_t ptr + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + int writeData(uint8_t *value, int size); + + protected: + + SSL* ssl; + +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_IO_TLSSOCKET_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/io/validation.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/validation.h b/libminifi/include/io/validation.h index 9dd1b8a..a5e1bc5 100644 --- a/libminifi/include/io/validation.h +++ b/libminifi/include/io/validation.h @@ -56,7 +56,7 @@ static auto IsNullOrEmpty( template<typename T> static auto IsNullOrEmpty( T *object) -> typename std::enable_if<size_function_functor_checker<T>::has_size_function==1, bool>::type { - return (0 == object || object->size() == 0); + return (nullptr == object || object->size() == 0); } /** @@ -65,8 +65,18 @@ static auto IsNullOrEmpty( template<typename T> static auto IsNullOrEmpty( T *object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type { - return (0 == object); + return (nullptr == object); } + +/** + * Determines if the variable is null or ::size() == 0 + */ +template<typename T> +static auto IsNullOrEmpty( + std::shared_ptr<T> object) -> typename std::enable_if<not size_function_functor_checker<T>::has_size_function , bool>::type { + return (nullptr == object || nullptr == object.get()); +} + /** * Determines if the variable is null or strlen(str) == 0 */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/AppendHostInfo.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h index a16dff3..d33c717 100644 --- a/libminifi/include/processors/AppendHostInfo.h +++ b/libminifi/include/processors/AppendHostInfo.h @@ -25,6 +25,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -70,6 +71,9 @@ class AppendHostInfo : public core::Processor { std::shared_ptr<logging::Logger> logger_; }; +REGISTER_RESOURCE(AppendHostInfo); + + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h index f74f489..dbf2d15 100644 --- a/libminifi/include/processors/ExecuteProcess.h +++ b/libminifi/include/processors/ExecuteProcess.h @@ -35,6 +35,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -115,6 +116,8 @@ class ExecuteProcess : public core::Processor { pid_t _pid; }; +REGISTER_RESOURCE(ExecuteProcess); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h index d15a02c..2f24e64 100644 --- a/libminifi/include/processors/GenerateFlowFile.h +++ b/libminifi/include/processors/GenerateFlowFile.h @@ -24,6 +24,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -89,6 +90,8 @@ class GenerateFlowFile : public core::Processor { uint64_t _dataSize; }; +REGISTER_RESOURCE(GenerateFlowFile); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/GetFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index 5345404..e25e7db 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -23,6 +23,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -134,6 +135,8 @@ class GetFile : public core::Processor { }; +REGISTER_RESOURCE(GetFile); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/InvokeHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h index 789b3b5..ca4fef6 100644 --- a/libminifi/include/processors/InvokeHTTP.h +++ b/libminifi/include/processors/InvokeHTTP.h @@ -28,6 +28,8 @@ #include "core/ProcessSession.h" #include "core/Core.h" #include "core/Property.h" +#include "core/Resource.h" +#include "controllers/SSLContextService.h" #include "utils/ByteInputCallBack.h" namespace org { @@ -109,7 +111,8 @@ class InvokeHTTP : public core::Processor { connect_timeout_(20000), penalize_no_retry_(false), read_timeout_(20000), - always_output_response_(false) { + always_output_response_(false), + ssl_context_service_(nullptr) { curl_global_init(CURL_GLOBAL_DEFAULT); } // Destructor @@ -161,6 +164,23 @@ class InvokeHTTP : public core::Processor { protected: /** + * Configures the SSL Context. Relies on the Context service and OpenSSL's installation + */ + static CURLcode configure_ssl_context(CURL *curl, void *ctx, void *param); + + /** + * Determines if a secure connection is required + * @param url url we will be connecting to + * @returns true if secure connection is allowed/required + */ + bool isSecure(const std::string &url); + + /** + * Configures a secure connection + */ + void configure_secure_connection(CURL *http_session); + + /** * Generate a transaction ID * @return transaction ID string. */ @@ -190,13 +210,17 @@ class InvokeHTTP : public core::Processor { void route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, - bool isSuccess, int statusCode); + bool isSuccess, + int statusCode); /** * Determine if we should emit a new flowfile based on our activity * @param method method type * @return result of the evaluation. */ bool emitFlowFile(const std::string &method); + + std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; + CURLcode res; // http method @@ -219,6 +243,8 @@ class InvokeHTTP : public core::Processor { bool penalize_no_retry_; }; +REGISTER_RESOURCE(InvokeHTTP) + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h index 69432be..586a699 100644 --- a/libminifi/include/processors/ListenHTTP.h +++ b/libminifi/include/processors/ListenHTTP.h @@ -29,6 +29,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -105,8 +106,6 @@ class ListenHTTP : public core::Processor { const struct mg_request_info *_reqInfo; }; - protected: - private: // Logger std::shared_ptr<logging::Logger> _logger; @@ -115,6 +114,9 @@ class ListenHTTP : public core::Processor { std::unique_ptr<Handler> _handler; }; + +REGISTER_RESOURCE(ListenHTTP); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h index cbbdf41..4e642e8 100644 --- a/libminifi/include/processors/ListenSyslog.h +++ b/libminifi/include/processors/ListenSyslog.h @@ -36,6 +36,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -206,6 +207,8 @@ class ListenSyslog : public core::Processor { char _buffer[2048]; }; +REGISTER_RESOURCE(ListenSyslog); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/LoadProcessors.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h new file mode 100644 index 0000000..7a16773 --- /dev/null +++ b/libminifi/include/processors/LoadProcessors.h @@ -0,0 +1,34 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ +#define LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ + +#include "core/Core.h" +#include "core/Resource.h" + +#include "AppendHostInfo.h" +#include "ExecuteProcess.h" +#include "GenerateFlowFile.h" +#include "GetFile.h" +#include "ListenHTTP.h" +#include "LogAttribute.h" +#include "PutFile.h" +#include "TailFile.h" + + +#endif /* LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h index dcc802d..56864c7 100644 --- a/libminifi/include/processors/LogAttribute.h +++ b/libminifi/include/processors/LogAttribute.h @@ -24,6 +24,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -120,6 +121,8 @@ class LogAttribute : public core::Processor { std::shared_ptr<logging::Logger> logger_; }; +REGISTER_RESOURCE(LogAttribute); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index cc5dfca..a51f6b5 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -24,6 +24,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -103,6 +104,8 @@ class PutFile : public core::Processor { const std::string &tmpFile, const std::string &destFile); }; +REGISTER_RESOURCE(PutFile); + } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/processors/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index c6349a0..ac7db5a 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -24,6 +24,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" +#include "core/Resource.h" namespace org { namespace apache { @@ -73,8 +74,9 @@ class TailFile : public core::Processor { std::string _stateFile; // State related to the tailed file std::string _currentTailFileName; - uint64_t _currentTailFilePosition; + // determine if state is recovered; bool _stateRecovered; + uint64_t _currentTailFilePosition; uint64_t _currentTailFileCreatedTime; static const int BUFFER_SIZE = 512; @@ -89,6 +91,8 @@ class TailFile : public core::Processor { }; +REGISTER_RESOURCE(TailFile); + // Matched File Item for Roll over check typedef struct { std::string fileName; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index cb1b412..4119edd 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -28,6 +28,7 @@ #include <iostream> #include <fstream> #include "core/Core.h" +#include "utils/StringUtils.h" #include "core/logging/Logger.h" namespace org { @@ -38,7 +39,9 @@ namespace minifi { class Configure { public: // nifi.flow.configuration.file + static const char *nifi_default_directory; static const char *nifi_flow_configuration_file; + static const char *nifi_flow_engine_threads; static const char *nifi_administrative_yield_duration; static const char *nifi_bored_yield_duration; static const char *nifi_graceful_shutdown_seconds; @@ -80,7 +83,7 @@ class Configure { // Set the config value void set(std::string key, std::string value) { std::lock_guard<std::mutex> lock(mutex_); - properties_[key] = value; + properties_[key] = std::string(value.c_str()); } // Check whether the config value existed bool has(std::string key) { @@ -89,6 +92,12 @@ class Configure { } // Get the config value bool get(std::string key, std::string &value); + + /** + * Returns the configuration value or an empty string. + * @return value corresponding to key or empty value. + */ + int getInt(const std::string &key, int default_value); // Parse one line in configure file like key=value void parseConfigureFileLine(char *buf); // Load Configure File http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 7508900..e3c15d8 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -35,58 +35,54 @@ namespace utils { * purpose: Provides a wrapper for the functor * and returns a future based on the template argument. */ -template< typename T> -class Worker{ -public: - explicit Worker(std::function<T()> &task) : task(task) - { +template<typename T> +class Worker { + public: + explicit Worker(std::function<T()> &task) + : task(task) { promise = std::make_shared<std::promise<T>>(); } /** * Move constructor for worker tasks */ - Worker(Worker &&other) : task (std::move(other.task)), - promise(other.promise) - { + Worker(Worker &&other) + : task(std::move(other.task)), + promise(other.promise) { } - /** * Runs the task and takes the output from the funtor * setting the result into the promise */ - void run() - { + void run() { T result = task(); promise->set_value(result); } - Worker<T>(const Worker<T>&) = delete; - Worker<T>& operator = (const Worker<T>&) = delete; + Worker<T>(const Worker<T>&) = delete; + Worker<T>& operator =(const Worker<T>&) = delete; - Worker<T>& operator = (Worker<T>&&) ; + Worker<T>& operator =(Worker<T> &&); std::shared_ptr<std::promise<T>> getPromise(); -private: - std::function<T()> task; - std::shared_ptr<std::promise<T>> promise; + private: + std::function<T()> task; + std::shared_ptr<std::promise<T>> promise; }; -template< typename T> -Worker<T>& Worker<T>::operator = (Worker<T>&& other) -{ - task = std::move(other.task); - promise = other.promise; - return *this; +template<typename T> +Worker<T>& Worker<T>::operator =(Worker<T> && other) { + task = std::move(other.task); + promise = other.promise; + return *this; } - template<typename T> -std::shared_ptr<std::promise<T>> Worker<T>::getPromise(){ - return promise; - } +std::shared_ptr<std::promise<T>> Worker<T>::getPromise() { + return promise; +} /** * Thread pool @@ -95,171 +91,190 @@ std::shared_ptr<std::promise<T>> Worker<T>::getPromise(){ * Design: Locked control over a manager thread that controls the worker threads */ template<typename T> -class ThreadPool - { - public: - ThreadPool(int max_worker_threads, bool daemon_threads=false) : max_worker_threads_(max_worker_threads) - ,daemon_threads_(daemon_threads), running_(false){ - current_workers_ = 0; - } - virtual ~ThreadPool(){ - shutdown(); - } - - /** - * Execute accepts a worker task and returns - * a future - * @param task this thread pool will subsume ownership of - * the worker task - * @return future with the impending result. - */ - std::future<T> execute(Worker<T> &&task); - /** - * Starts the Thread Pool - */ - void start(); - /** - * Shutdown the thread pool and clear any - * currently running activities - */ - void shutdown(); - /** - * Set the max concurrent tasks. When this is done - * we must start and restart the thread pool if - * the number of tasks is less than the currently configured number - */ - void setMaxConcurrentTasks(uint16_t max) - { - std::lock_guard<std::recursive_mutex> lock(manager_mutex_); - if (running_) - { - shutdown(); - } - max_worker_threads_= max; - if (!running_) - start(); - } - - protected: - - /** - * Drain will notify tasks to stop following notification - */ - void drain() - { - while(current_workers_ > 0) - { - tasks_available_.notify_one(); - } - } - // determines if threads are detached - bool daemon_threads_; - // max worker threads - int max_worker_threads_; - // current worker tasks. - std::atomic<int> current_workers_; - // thread queue - std::vector<std::thread> thread_queue_; - // manager thread - std::thread manager_thread_; - // atomic running boolean - std::atomic<bool> running_; - // worker queue of worker objects - std::queue<Worker<T>> worker_queue_; - // notification for available work - std::condition_variable tasks_available_; - // manager mutex - std::recursive_mutex manager_mutex_; - // work queue mutex - std::mutex worker_queue_mutex_; - - /** - * Call for the manager to start worker threads - */ - void startWorkers(); - - /** - * Runs worker tasks - */ - void run_tasks(); - }; +class ThreadPool { + public: + + ThreadPool(int max_worker_threads = 8, bool daemon_threads = false) + : max_worker_threads_(max_worker_threads), + daemon_threads_(daemon_threads), + running_(false) { + current_workers_ = 0; + } + + ThreadPool(const ThreadPool<T> &&other) + : max_worker_threads_(std::move(other.max_worker_threads_)), + daemon_threads_(std::move(other.daemon_threads_)), + running_(false) { + current_workers_ = 0; + } + virtual ~ThreadPool() { + shutdown(); + } + + /** + * Execute accepts a worker task and returns + * a future + * @param task this thread pool will subsume ownership of + * the worker task + * @return future with the impending result. + */ + std::future<T> execute(Worker<T> &&task); + /** + * Starts the Thread Pool + */ + void start(); + /** + * Shutdown the thread pool and clear any + * currently running activities + */ + void shutdown(); + /** + * Set the max concurrent tasks. When this is done + * we must start and restart the thread pool if + * the number of tasks is less than the currently configured number + */ + void setMaxConcurrentTasks(uint16_t max) { + std::lock_guard<std::recursive_mutex> lock(manager_mutex_); + if (running_) { + shutdown(); + } + max_worker_threads_ = max; + if (!running_) + start(); + } + + ThreadPool<T> operator=(const ThreadPool<T> &other) = delete; + ThreadPool(const ThreadPool<T> &other) = delete; + + ThreadPool<T> &operator=(ThreadPool<T> &&other) { + std::lock_guard<std::recursive_mutex> lock(manager_mutex_); + if (other.running_) { + other.shutdown(); + } + if (running_) { + shutdown(); + } + max_worker_threads_ = std::move(other.max_worker_threads_); + daemon_threads_ = std::move(other.daemon_threads_); + current_workers_ = 0; + + thread_queue_ = std::move(other.thread_queue_); + worker_queue_ = std::move(other.worker_queue_); + if (!running_) { + start(); + } + return *this; + } + + protected: + + /** + * Drain will notify tasks to stop following notification + */ + void drain() { + while (current_workers_ > 0) { + tasks_available_.notify_one(); + } + } +// determines if threads are detached + bool daemon_threads_; +// max worker threads + int max_worker_threads_; +// current worker tasks. + std::atomic<int> current_workers_; +// thread queue + std::vector<std::thread> thread_queue_; +// manager thread + std::thread manager_thread_; +// atomic running boolean + std::atomic<bool> running_; +// worker queue of worker objects + std::queue<Worker<T>> worker_queue_; +// notification for available work + std::condition_variable tasks_available_; +// manager mutex + std::recursive_mutex manager_mutex_; +// work queue mutex + std::mutex worker_queue_mutex_; + + /** + * Call for the manager to start worker threads + */ + void startWorkers(); + + /** + * Runs worker tasks + */ + void run_tasks(); +} +; template<typename T> -std::future<T> ThreadPool<T>::execute(Worker<T> &&task){ +std::future<T> ThreadPool<T>::execute(Worker<T> &&task) { std::unique_lock<std::mutex> lock(worker_queue_mutex_); bool wasEmpty = worker_queue_.empty(); std::future<T> future = task.getPromise()->get_future(); worker_queue_.push(std::move(task)); - if (wasEmpty) - { - tasks_available_.notify_one(); + if (wasEmpty) { + tasks_available_.notify_one(); } return future; } -template< typename T> -void ThreadPool<T>::startWorkers(){ - for (int i = 0; i < max_worker_threads_; i++) - { - thread_queue_.push_back( std::thread(&ThreadPool::run_tasks, this)); - current_workers_++; - } +template<typename T> +void ThreadPool<T>::startWorkers() { + for (int i = 0; i < max_worker_threads_; i++) { + thread_queue_.push_back(std::thread(&ThreadPool::run_tasks, this)); + current_workers_++; + } - if (daemon_threads_) - { - for (auto &thread : thread_queue_){ - thread.detach(); - } - } - for (auto &thread : thread_queue_) - { - if (thread.joinable()) - thread.join(); + if (daemon_threads_) { + for (auto &thread : thread_queue_) { + thread.detach(); } + } + for (auto &thread : thread_queue_) { + if (thread.joinable()) + thread.join(); + } } -template< typename T> -void ThreadPool<T>::run_tasks() -{ - while (running_.load()) - { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - if (worker_queue_.empty()) - { +template<typename T> +void ThreadPool<T>::run_tasks() { + while (running_.load()) { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + if (worker_queue_.empty()) { - tasks_available_.wait(lock); - } + tasks_available_.wait(lock); + } - if (!running_.load()) - break; + if (!running_.load()) + break; - if (worker_queue_.empty()) - continue; - Worker<T> task = std::move(worker_queue_.front()); - worker_queue_.pop(); - task.run(); - } - current_workers_--; + if (worker_queue_.empty()) + continue; + Worker<T> task = std::move(worker_queue_.front()); + worker_queue_.pop(); + task.run(); + } + current_workers_--; } -template< typename T> - void ThreadPool<T>::start() -{ +template<typename T> +void ThreadPool<T>::start() { std::lock_guard<std::recursive_mutex> lock(manager_mutex_); - if (!running_) - { + if (!running_) { running_ = true; manager_thread_ = std::thread(&ThreadPool::startWorkers, this); } } -template< typename T> -void ThreadPool<T>::shutdown(){ +template<typename T> +void ThreadPool<T>::shutdown() { std::lock_guard<std::recursive_mutex> lock(manager_mutex_); - if (running_.load()) - { + if (running_.load()) { running_.store(false); @@ -267,21 +282,19 @@ void ThreadPool<T>::shutdown(){ if (manager_thread_.joinable()) manager_thread_.join(); { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - thread_queue_.clear(); - current_workers_ = 0; - while(!worker_queue_.empty()) - worker_queue_.pop(); + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + thread_queue_.clear(); + current_workers_ = 0; + while (!worker_queue_.empty()) + worker_queue_.pop(); } } } - } /* namespace utils */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ } /* namespace org */ - #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 95562c3..f35e88a 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ #include "properties/Configure.h" +#include <cstdlib> #include <string> #include "utils/StringUtils.h" #include "core/Core.h" @@ -25,8 +26,10 @@ namespace apache { namespace nifi { namespace minifi { +const char *Configure::nifi_default_directory = "nifi.default.directory"; const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; +const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads"; const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; @@ -82,6 +85,17 @@ bool Configure::get(std::string key, std::string &value) { } } +int Configure::getInt(const std::string &key, int default_value) { + std::lock_guard<std::mutex> lock(mutex_); + auto it = properties_.find(key); + + if (it != properties_.end()) { + return std::atol(it->second.c_str()); + } else { + return default_value; + } +} + // Parse one line in configure file like key=value void Configure::parseConfigureFileLine(char *buf) { char *line = buf; @@ -124,8 +138,7 @@ void Configure::loadConfigureFile(const char *fileName) { if (fileName) { // perform a naive determination if this is a relative path if (fileName[0] != '/') { - adjustedFilename = adjustedFilename + getHome() + "/" - + fileName; + adjustedFilename = adjustedFilename + getHome() + "/" + fileName; } else { adjustedFilename += fileName; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/EventDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index cbb60ea..fa2171b 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -44,10 +44,10 @@ void EventDrivenSchedulingAgent::run( // Honor the yield std::this_thread::sleep_for( std::chrono::milliseconds(processor->getYieldTime())); - } else if (shouldYield && this->_boredYieldDuration > 0) { + } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure std::this_thread::sleep_for( - std::chrono::milliseconds(this->_boredYieldDuration)); + std::chrono::milliseconds(this->bored_yield_duration_)); } // Block until work is available http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 6785a9d..5f6e014 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -37,6 +37,7 @@ #include "core/ProcessGroup.h" #include "utils/StringUtils.h" #include "core/Core.h" +#include "core/controller/ControllerServiceProvider.h" #include "core/repository/FlowFileRepository.h" namespace org { @@ -52,7 +53,8 @@ FlowController::FlowController( std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, const std::string name, bool headless_mode) - : CoreComponent(core::getClassName<FlowController>()), + : core::controller::ControllerServiceProvider( + core::getClassName<FlowController>()), root_(nullptr), max_timer_driven_threads_(0), max_event_driven_threads_(0), @@ -61,14 +63,20 @@ FlowController::FlowController( provenance_repo_(provenance_repo), flow_file_repo_(flow_file_repo), protocol_(0), - _timerScheduler(provenance_repo_, configure), - _eventScheduler(provenance_repo_, configure), - flow_configuration_(std::move(flow_configuration)) { + controller_service_map_( + std::make_shared<core::controller::ControllerServiceMap>()), + timer_scheduler_(nullptr), + event_scheduler_(nullptr), + controller_service_provider_(nullptr), + flow_configuration_(std::move(flow_configuration)), + configuration_(configure) { if (provenance_repo == nullptr) throw std::runtime_error("Provenance Repo should not be null"); if (flow_file_repo == nullptr) throw std::runtime_error("Flow Repo should not be null"); - + if (IsNullOrEmpty(configuration_)) { + throw std::runtime_error("Must supply a configuration."); + } uuid_generate(uuid_); setUUID(uuid_); @@ -80,14 +88,14 @@ FlowController::FlowController( max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD; running_ = false; initialized_ = false; - root_ = NULL; + root_ = nullptr; protocol_ = new FlowControlProtocol(this, configure); if (!headless_mode) { std::string rawConfigFileString; configure->get(Configure::nifi_flow_configuration_file, - rawConfigFileString); + rawConfigFileString); if (!rawConfigFileString.empty()) { configuration_filename_ = rawConfigFileString; @@ -162,15 +170,15 @@ void FlowController::stop(bool force) { running_ = false; logger_->log_info("Stop Flow Controller"); - this->_timerScheduler.stop(); - this->_eventScheduler.stop(); + this->timer_scheduler_->stop(); + this->event_scheduler_->stop(); this->flow_file_repo_->stop(); this->provenance_repo_->stop(); // Wait for sometime for thread stop std::this_thread::sleep_for(std::chrono::milliseconds(1000)); if (this->root_) - this->root_->stopProcessing(&this->_timerScheduler, - &this->_eventScheduler); + this->root_->stopProcessing(this->timer_scheduler_.get(), + this->event_scheduler_.get()); } } @@ -221,14 +229,40 @@ void FlowController::load() { stop(true); } if (!initialized_) { + logger_->log_info("Initializing timers"); + if (nullptr == timer_scheduler_) { + timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>( + std::static_pointer_cast<core::controller::ControllerServiceProvider>( + shared_from_this()), + provenance_repo_, configuration_); + } + if (nullptr == event_scheduler_) { + event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>( + std::static_pointer_cast<core::controller::ControllerServiceProvider>( + shared_from_this()), + provenance_repo_, configuration_); + } logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str()); - this->root_ = flow_configuration_->getRoot(configuration_filename_); + this->root_ = std::shared_ptr<core::ProcessGroup>( + flow_configuration_->getRoot(configuration_filename_)); + + logger_->log_info("Loaded root processor Group"); + + controller_service_provider_ = flow_configuration_ + ->getControllerServiceProvider(); + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>( + controller_service_provider_)->setRootGroup(root_); + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>( + controller_service_provider_)->setSchedulingAgent( + std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_)); + + logger_->log_info("Loaded controller service provider"); // Load Flow File from Repo loadFlowRepo(); - + logger_->log_info("Loaded flow repository"); initialized_ = true; } } @@ -255,15 +289,20 @@ void FlowController::reload(std::string yamlFile) { } void FlowController::loadFlowRepo() { - if (this->flow_file_repo_) { + if (this->flow_file_repo_ != nullptr) { + logger_->log_debug("Getting connection map"); std::map<std::string, std::shared_ptr<Connection>> connectionMap; if (this->root_ != nullptr) { this->root_->getConnections(connectionMap); } + logger_->log_debug("Number of connections from connectionMap %d", + connectionMap.size()); auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>( flow_file_repo_); rep->setConnectionMap(connectionMap); flow_file_repo_->loadComponent(); + } else { + logger_->log_debug("Flow file repository is not set"); } } @@ -276,11 +315,12 @@ bool FlowController::start() { } else { if (!running_) { logger_->log_info("Starting Flow Controller"); - this->_timerScheduler.start(); - this->_eventScheduler.start(); + controller_service_provider_->enableAllControllerServices(); + this->timer_scheduler_->start(); + this->event_scheduler_->start(); if (this->root_ != nullptr) { - this->root_->startProcessing(&this->_timerScheduler, - &this->_eventScheduler); + this->root_->startProcessing(this->timer_scheduler_.get(), + this->event_scheduler_.get()); } running_ = true; this->protocol_->start(); @@ -291,6 +331,163 @@ bool FlowController::start() { return true; } } +/** + * Controller Service functions + * + */ + +/** + * Creates a controller service through the controller service provider impl. + * @param type class name + * @param id service identifier + * @param firstTimeAdded first time this CS was added + */ +std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService( + const std::string &type, const std::string &id, + bool firstTimeAdded) { + return controller_service_provider_->createControllerService(type, id, + firstTimeAdded); +} + +/** + * controller service provider + */ +/** + * removes controller service + * @param serviceNode service node to be removed. + */ + +void FlowController::removeControllerService( + const std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + controller_map_->removeControllerService(serviceNode); +} + +/** + * Enables the controller service services + * @param serviceNode service node which will be disabled, along with linked services. + */ +void FlowController::enableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->enableControllerService(serviceNode); +} + +/** + * Enables controller services + * @param serviceNoden vector of service nodes which will be enabled, along with linked services. + */ +void FlowController::enableControllerServices( + std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) { +} + +/** + * Disables controller services + * @param serviceNode service node which will be disabled, along with linked services. + */ +void FlowController::disableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + controller_service_provider_->disableControllerService(serviceNode); +} + +/** + * Gets all controller services. + */ +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::getAllControllerServices() { + return controller_service_provider_->getAllControllerServices(); +} + +/** + * Gets controller service node specified by <code>id</code> + * @param id service identifier + * @return shared pointer to the controller service node or nullptr if it does not exist. + */ +std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode( + const std::string &id) { + return controller_service_provider_->getControllerServiceNode(id); +} + +void FlowController::verifyCanStopReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +} + +/** + * Unschedules referencing components. + */ +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::unscheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->unscheduleReferencingComponents( + serviceNode); +} + +/** + * Verify can disable referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ +void FlowController::verifyCanDisableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + controller_service_provider_->verifyCanDisableReferencingServices( + serviceNode); +} + +/** + * Disables referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::disableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->disableReferencingServices(serviceNode); +} + +/** + * Verify can enable referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ +void FlowController::verifyCanEnableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + controller_service_provider_->verifyCanEnableReferencingServices(serviceNode); +} + +/** + * Determines if the controller service specified by identifier is enabled. + */ +bool FlowController::isControllerServiceEnabled(const std::string &identifier) { + return controller_service_provider_->isControllerServiceEnabled(identifier); +} + +/** + * Enables referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::enableReferencingServices( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->enableReferencingServices(serviceNode); +} + +/** + * Schedules referencing components + * @param serviceNode service node whose referenced components will be scheduled. + */ +std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::scheduleReferencingComponents( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->scheduleReferencingComponents( + serviceNode); +} + +/** + * Returns controller service components referenced by serviceIdentifier from the embedded + * controller service provider; + */ +std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent( + const std::string &serviceIdentifier, const std::string &componentId) { + return controller_service_provider_->getControllerServiceForComponent( + serviceIdentifier, componentId); +} + +/** + * Enables all controller services for the provider. + */ +void FlowController::enableAllControllerServices() { + controller_service_provider_->enableAllControllerServices(); +} } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index d69ba00..fc979fd 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -20,6 +20,7 @@ #include "SchedulingAgent.h" #include <chrono> #include <thread> +#include <utility> #include <memory> #include <iostream> #include "Exception.h" @@ -39,6 +40,36 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) { return false; } +void SchedulingAgent::enableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + + logger_->log_trace("Enabling CSN in SchedulingAgent %s", + serviceNode->getName()); + // reference the enable function from serviceNode + std::function<bool()> f_ex = [serviceNode] { + return serviceNode->enable(); + }; + // create a functor that will be submitted to the thread pool. + utils::Worker<bool> functor(f_ex); + // move the functor into the thread pool. While a future is returned + // we aren't terribly concerned with the result. + component_lifecycle_thread_pool_.execute(std::move(functor)); +} + +void SchedulingAgent::disableControllerService( + std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + + // reference the disable function from serviceNode + std::function<bool()> f_ex = [serviceNode] { + return serviceNode->disable(); + }; + // create a functor that will be submitted to the thread pool. + utils::Worker<bool> functor(f_ex); + // move the functor into the thread pool. While a future is returned + // we aren't terribly concerned with the result. + component_lifecycle_thread_pool_.execute(std::move(functor)); +} + bool SchedulingAgent::hasTooMuchOutGoing( std::shared_ptr<core::Processor> processor) { return processor->flowFilesOutGoingFull(); @@ -71,11 +102,11 @@ bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, processor->decrementActiveTask(); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); - processor->yield(_administrativeYieldDuration); + processor->yield(admin_yield_duration_); processor->decrementActiveTask(); } catch (...) { logger_->log_debug("Caught Exception during SchedulingAgent::onTrigger"); - processor->yield(_administrativeYieldDuration); + processor->yield(admin_yield_duration_); processor->decrementActiveTask(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/ThreadedSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 65e7531..7e9bb03 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -39,30 +39,28 @@ void ThreadedSchedulingAgent::schedule( std::shared_ptr<core::Processor> processor) { std::lock_guard<std::mutex> lock(mutex_); - _administrativeYieldDuration = 0; + admin_yield_duration_ = 0; std::string yieldValue; if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) { core::TimeUnit unit; - if (core::Property::StringToTime(yieldValue, _administrativeYieldDuration, - unit) - && core::Property::ConvertTimeUnitToMS(_administrativeYieldDuration, - unit, - _administrativeYieldDuration)) { + if (core::Property::StringToTime(yieldValue, admin_yield_duration_, unit) + && core::Property::ConvertTimeUnitToMS(admin_yield_duration_, unit, + admin_yield_duration_)) { logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", - _administrativeYieldDuration); + admin_yield_duration_); } } - _boredYieldDuration = 0; + bored_yield_duration_ = 0; if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) { core::TimeUnit unit; - if (core::Property::StringToTime(yieldValue, _boredYieldDuration, unit) - && core::Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, - _boredYieldDuration)) { + if (core::Property::StringToTime(yieldValue, bored_yield_duration_, unit) + && core::Property::ConvertTimeUnitToMS(bored_yield_duration_, unit, + bored_yield_duration_)) { logger_->log_debug("nifi_bored_yield_duration: [%d] ms", - _boredYieldDuration); + bored_yield_duration_); } } @@ -82,8 +80,8 @@ void ThreadedSchedulingAgent::schedule( } core::ProcessorNode processor_node(processor); - auto processContext = std::make_shared<core::ProcessContext>(processor_node, - repo_); + auto processContext = std::make_shared<core::ProcessContext>( + processor_node, controller_service_provider_, repo_); auto sessionFactory = std::make_shared<core::ProcessSessionFactory>( processContext.get()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 8d10658..8610e64 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -40,10 +40,10 @@ void TimerDrivenSchedulingAgent::run( // Honor the yield std::this_thread::sleep_for( std::chrono::milliseconds(processor->getYieldTime())); - } else if (shouldYield && this->_boredYieldDuration > 0) { + } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure std::this_thread::sleep_for( - std::chrono::milliseconds(this->_boredYieldDuration)); + std::chrono::milliseconds(this->bored_yield_duration_)); } std::this_thread::sleep_for( std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
