Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 83fa06e2a -> 2e71beb9a
MINIFICPP-118 Added dynamic properties support This closes #261. Signed-off-by: Bin Qiu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2e71beb9 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2e71beb9 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2e71beb9 Branch: refs/heads/master Commit: 2e71beb9ae7b63ed2e77e7cdb74f17e3a2fda3f9 Parents: 83fa06e Author: Andrew I. Christianson <[email protected]> Authored: Mon Feb 5 17:01:36 2018 -0500 Committer: Bin Qiu <[email protected]> Committed: Thu Feb 8 16:16:34 2018 -0800 ---------------------------------------------------------------------- .../expression-language/ProcessContextExpr.cpp | 13 +++ .../noop/ProcessContextExprNoOp.cpp | 5 ++ libminifi/include/core/ConfigurableComponent.h | 48 +++++++++++ libminifi/include/core/ProcessContext.h | 10 +++ libminifi/include/core/Processor.h | 4 + libminifi/include/core/ProcessorNode.h | 4 + .../include/core/controller/ControllerService.h | 5 ++ .../core/controller/ControllerServiceNode.h | 4 + .../core/controller/ControllerServiceProvider.h | 4 + libminifi/src/core/ConfigurableComponent.cpp | 79 ++++++++++++++++- libminifi/src/core/yaml/YamlConfiguration.cpp | 30 +++++-- libminifi/test/unit/DynamicPropertyTests.cpp | 91 ++++++++++++++++++++ libminifi/test/unit/YamlConfigurationTests.cpp | 37 ++++++++ 13 files changed, 325 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/extensions/expression-language/ProcessContextExpr.cpp ---------------------------------------------------------------------- diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp index 0748e2d..ef4bc4b 100644 --- a/extensions/expression-language/ProcessContextExpr.cpp +++ b/extensions/expression-language/ProcessContextExpr.cpp @@ -36,6 +36,19 @@ bool ProcessContext::getProperty(const std::string &name, std::string &value, return true; } +bool ProcessContext::getDynamicProperty(const std::string &name, std::string &value, + const std::shared_ptr<FlowFile> &flow_file) { + if (expressions_.find(name) == expressions_.end()) { + std::string expression_str; + getDynamicProperty(name, expression_str); + logger_->log_debug("Compiling expression for %s/%s: %s", getProcessorNode()->getName(), name, expression_str); + expressions_.emplace(name, expression::compile(expression_str)); + } + + value = expressions_[name]({flow_file}); + return true; +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/extensions/expression-language/noop/ProcessContextExprNoOp.cpp ---------------------------------------------------------------------- diff --git a/extensions/expression-language/noop/ProcessContextExprNoOp.cpp b/extensions/expression-language/noop/ProcessContextExprNoOp.cpp index fb34b15..6d70577 100644 --- a/extensions/expression-language/noop/ProcessContextExprNoOp.cpp +++ b/extensions/expression-language/noop/ProcessContextExprNoOp.cpp @@ -28,6 +28,11 @@ bool ProcessContext::getProperty(const std::string &name, std::string &value, return getProperty(name, value); } +bool ProcessContext::getDynamicProperty(const std::string &name, std::string &value, + const std::shared_ptr<FlowFile> &flow_file) { + return getDynamicProperty(name, value); +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/include/core/ConfigurableComponent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index a19abc7..fef78e9 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -25,6 +25,8 @@ #include <memory> #include <set> +#define DEFAULT_DYNAMIC_PROPERTY_DESC "Dynamic Property" + #include "logging/Logger.h" #include "Property.h" @@ -90,6 +92,47 @@ class __attribute__((visibility("default"))) ConfigurableComponent { * @return result of set operation. */ + /** + * Gets whether or not this processor supports dynamic properties. + * + * @return true if this component supports dynamic properties (default is false) + */ + virtual bool supportsDynamicProperties() = 0; + + /** + * Gets the value of a dynamic property (if it was set). + * + * @param name + * @param value + * @return + */ + bool getDynamicProperty(const std::string name, std::string &value); + + /** + * Sets the value of a new dynamic property. + * + * @param name + * @param value + * @return + */ + bool setDynamicProperty(const std::string name, std::string value); + + /** + * Updates the value of an existing dynamic property. + * + * @param name + * @param value + * @return + */ + bool updateDynamicProperty(const std::string &name, const std::string &value); + + /** + * Provides all dynamic property keys that have been set. + * + * @return vector of property keys + */ + std::vector<std::string> getDynamicProperyKeys(); + virtual ~ConfigurableComponent(); protected: @@ -105,9 +148,14 @@ class __attribute__((visibility("default"))) ConfigurableComponent { // Supported properties std::map<std::string, Property> properties_; + // Dynamic properties + std::map<std::string, Property> dynamic_properties_; + private: std::shared_ptr<logging::Logger> logger_; + bool createDynamicProperty(const std::string &name, const std::string &value); + }; } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index ad946a5..e67e412 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -70,9 +70,19 @@ class ProcessContext : public controller::ControllerServiceLookup { return processor_node_->getProperty(name, value); } bool getProperty(const std::string &name, std::string &value, const std::shared_ptr<FlowFile> &flow_file); + bool getDynamicProperty(const std::string &name, std::string &value) { + return processor_node_->getDynamicProperty(name, value); + } + bool getDynamicProperty(const std::string &name, std::string &value, const std::shared_ptr<FlowFile> &flow_file); + std::vector<std::string> getDynamicPropertyKeys() { + return processor_node_->getDynamicProperyKeys(); + } // Sets the property value using the property's string name bool setProperty(const std::string &name, std::string value) { return processor_node_->setProperty(name, value); + } // Sets the dynamic property value using the property's string name + bool setDynamicProperty(const std::string &name, std::string value) { + return processor_node_->setDynamicProperty(name, value); } // Sets the property value using the Property object bool setProperty(Property prop, std::string value) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index b083e75..18fe2a0 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -238,6 +238,10 @@ class __attribute__((visibility("default"))) Processor : public Connectable, pub stream_factory_ = stream_factory; } + virtual bool supportsDynamicProperties() { + return false; + } + protected: virtual void notifyStop() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/include/core/ProcessorNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h index f420728..4b99a71 100644 --- a/libminifi/include/core/ProcessorNode.h +++ b/libminifi/include/core/ProcessorNode.h @@ -214,6 +214,10 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { processor_->setMaxConcurrentTasks(tasks); } + virtual bool supportsDynamicProperties() { + return false; + } + virtual bool isRunning(); virtual bool isWorkAvailable(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/include/core/controller/ControllerService.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerService.h b/libminifi/include/core/controller/ControllerService.h index 7fcc338..8f52772 100644 --- a/libminifi/include/core/controller/ControllerService.h +++ b/libminifi/include/core/controller/ControllerService.h @@ -120,6 +120,11 @@ class ControllerService : public ConfigurableComponent, public Connectable { void setState(ControllerServiceState state) { current_state_ = state; } + + virtual bool supportsDynamicProperties() { + return false; + } + protected: std::shared_ptr<Configure> configuration_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/include/core/controller/ControllerServiceNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceNode.h b/libminifi/include/core/controller/ControllerServiceNode.h index 341ba90..6da7537 100644 --- a/libminifi/include/core/controller/ControllerServiceNode.h +++ b/libminifi/include/core/controller/ControllerServiceNode.h @@ -104,6 +104,10 @@ class ControllerServiceNode : public CoreComponent, public ConfigurableComponent */ virtual bool disable() = 0; + virtual bool supportsDynamicProperties() { + return false; + } + ControllerServiceNode(const ControllerServiceNode &other) = delete; ControllerServiceNode &operator=(const ControllerServiceNode &parent) = delete; protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/include/core/controller/ControllerServiceProvider.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h index 0499d35..651bceb 100644 --- a/libminifi/include/core/controller/ControllerServiceProvider.h +++ b/libminifi/include/core/controller/ControllerServiceProvider.h @@ -213,6 +213,10 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo virtual void enableAllControllerServices() = 0; + virtual bool supportsDynamicProperties() { + return false; + } + protected: /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index a8a0b0d..c534c6d 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -16,12 +16,12 @@ * limitations under the License. */ -#include "core/ConfigurableComponent.h" -#include <memory> #include <utility> #include <string> +#include <vector> #include <set> -#include "core/Property.h" + +#include "core/ConfigurableComponent.h" #include "core/logging/LoggerConfiguration.h" namespace org { @@ -36,6 +36,7 @@ ConfigurableComponent::ConfigurableComponent() ConfigurableComponent::ConfigurableComponent(const ConfigurableComponent &&other) : properties_(std::move(other.properties_)), + dynamic_properties_(std::move(other.dynamic_properties_)), logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) { } @@ -160,6 +161,78 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties return true; } +bool ConfigurableComponent::getDynamicProperty(const std::string name, std::string &value) { + std::lock_guard<std::mutex> lock(configuration_mutex_); + + auto &&it = dynamic_properties_.find(name); + if (it != dynamic_properties_.end()) { + Property item = it->second; + value = item.getValue(); + logger_->log_debug("Component %s dynamic property name %s value %s", name, item.getName(), value); + return true; + } else { + return false; + } +} + +bool ConfigurableComponent::createDynamicProperty(const std::string &name, const std::string &value) { + if (!supportsDynamicProperties()) { + logger_->log_debug("Attempted to create dynamic property %s, but this component does not support creation." + "of dynamic properties.", name); + return false; + } + + Property dyn(name, DEFAULT_DYNAMIC_PROPERTY_DESC, value); + logger_->log_info("Processor %s dynamic property '%s' value '%s'", + name.c_str(), + dyn.getName().c_str(), + value.c_str()); + dynamic_properties_[dyn.getName()] = dyn; + return true; +} + +bool ConfigurableComponent::setDynamicProperty(const std::string name, std::string value) { + std::lock_guard<std::mutex> lock(configuration_mutex_); + auto &&it = dynamic_properties_.find(name); + + if (it != dynamic_properties_.end()) { + Property item = it->second; + item.setValue(value); + dynamic_properties_[item.getName()] = item; + logger_->log_debug("Component %s dynamic property name %s value %s", name, item.getName(), value); + return true; + } else { + return createDynamicProperty(name, value); + } +} + +bool ConfigurableComponent::updateDynamicProperty(const std::string &name, const std::string &value) { + std::lock_guard<std::mutex> lock(configuration_mutex_); + auto &&it = dynamic_properties_.find(name); + + if (it != dynamic_properties_.end()) { + Property item = it->second; + item.addValue(value); + dynamic_properties_[item.getName()] = item; + logger_->log_debug("Component %s dynamic property name %s value %s", name, item.getName(), value); + return true; + } else { + return createDynamicProperty(name, value); + } +} + +std::vector<std::string> ConfigurableComponent::getDynamicProperyKeys() { + std::lock_guard<std::mutex> lock(configuration_mutex_); + + std::vector<std::string> result; + + for (const auto &pair : dynamic_properties_) { + result.emplace_back(pair.first); + } + + return result; +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 8b93c51..0526643 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -16,14 +16,12 @@ * limitations under the License. */ -#include "core/yaml/YamlConfiguration.h" #include <memory> -#include <cstdint> #include <string> #include <vector> #include <set> -#include "core/reporting/SiteToSiteProvenanceReportingTask.h" -#include "io/validation.h" + +#include "core/yaml/YamlConfiguration.h" namespace org { namespace apache { @@ -690,10 +688,20 @@ void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, if (!processor->updateProperty(propertyName, rawValueString)) { std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor); if (proc != 0) { - logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", + logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. " + "Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName()); + if (!processor->setDynamicProperty(propertyName, rawValueString)) { + logger_->log_warn("Unable to set the dynamic property %s with value %s", + propertyName.c_str(), + rawValueString.c_str()); + } else { + logger_->log_warn("Dynamic property %s with value %s set", + propertyName.c_str(), + rawValueString.c_str()); + } } } } @@ -703,10 +711,20 @@ void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, if (!processor->setProperty(propertyName, rawValueString)) { std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor); if (proc != 0) { - logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", + logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. " + "Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName()); + if (!processor->setDynamicProperty(propertyName, rawValueString)) { + logger_->log_warn("Unable to set the dynamic property %s with value %s", + propertyName.c_str(), + rawValueString.c_str()); + } else { + logger_->log_warn("Dynamic property %s with value %s set", + propertyName.c_str(), + rawValueString.c_str()); + } } } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/test/unit/DynamicPropertyTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/DynamicPropertyTests.cpp b/libminifi/test/unit/DynamicPropertyTests.cpp new file mode 100644 index 0000000..fbc9dfd --- /dev/null +++ b/libminifi/test/unit/DynamicPropertyTests.cpp @@ -0,0 +1,91 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> +#include "core/ConfigurableComponent.h" +#include "../TestBase.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +class TestConfigurableComponentSupportsDynamic : public ConfigurableComponent { + public: + virtual bool supportsDynamicProperties() { + return true; + } + + virtual bool canEdit() { + return false; + } +}; + +class TestConfigurableComponentNotSupportsDynamic : public ConfigurableComponent { + public: + virtual bool supportsDynamicProperties() { + return false; + } + + virtual bool canEdit() { + return false; + } +}; + +TEST_CASE("Test Set Dynamic Property", "[testSetDynamicProperty]") { + TestConfigurableComponentSupportsDynamic component; + component.setDynamicProperty("test", "value"); + std::string value; + component.getDynamicProperty("test", value); + REQUIRE(value == "value"); +} + +TEST_CASE("Test Set Dynamic Property 2", "[testSetDynamicProperty2]") { + TestConfigurableComponentSupportsDynamic component; + component.setDynamicProperty("test", "value"); + component.setDynamicProperty("test", "value2"); + std::string value; + component.getDynamicProperty("test", value); + REQUIRE(value == "value2"); +} + +TEST_CASE("Test Set Dynamic Property Fail", "[testSetDynamicPropertyFail]") { + TestConfigurableComponentNotSupportsDynamic component; + REQUIRE(!component.setDynamicProperty("test", "value")); + std::string value; + component.getDynamicProperty("test", value); + REQUIRE(value == ""); +} + +TEST_CASE("Test Set Dynamic Property 3", "[testSetDynamicProperty2]") { + TestConfigurableComponentSupportsDynamic component; + component.setDynamicProperty("test", "value"); + component.setDynamicProperty("test2", "value2"); + std::string value; + auto propertyKeys = component.getDynamicProperyKeys(); + REQUIRE(2 == propertyKeys.size()); + REQUIRE("test" == propertyKeys[0]); + REQUIRE("test2" == propertyKeys[1]); +} + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2e71beb9/libminifi/test/unit/YamlConfigurationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp b/libminifi/test/unit/YamlConfigurationTests.cpp index 4700ab3..0b853a6 100644 --- a/libminifi/test/unit/YamlConfigurationTests.cpp +++ b/libminifi/test/unit/YamlConfigurationTests.cpp @@ -339,3 +339,40 @@ NiFi Properties Overrides: {} REQUIRE(it.second->getSource()); } } + +TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") { + TestController test_controller; + + LogTestController &logTestController = LogTestController::getInstance(); + logTestController.setDebug<TestPlan>(); + logTestController.setDebug<core::YamlConfiguration>(); + + std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true); + std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> + content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + core::YamlConfiguration *yamlConfig = + new core::YamlConfiguration(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration); + + static const std::string TEST_CONFIG_YAML = R"( +Flow Controller: + name: Simple +Processors: +- name: PutFile + class: PutFile + Properties: + Dynamic Property: Bad + )"; + std::istringstream configYamlStream(TEST_CONFIG_YAML); + std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getYamlRoot(configYamlStream); + + REQUIRE(rootFlowConfig); + REQUIRE(rootFlowConfig->findProcessor("PutFile")); + REQUIRE(NULL != rootFlowConfig->findProcessor("PutFile")->getUUID()); + REQUIRE(!rootFlowConfig->findProcessor("PutFile")->getUUIDStr().empty()); + + REQUIRE(LogTestController::getInstance().contains("[warning] Unable to set the dynamic property " + "Dynamic Property with value Bad")); +}
