This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e1433c1b4e04aaaa1bac46d338631e7689549a0c Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Jul 26 14:07:24 2021 +0200 MINIFICPP-1522 Log invalid attribute in case of YAML parse failure Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1143 --- .../tests/unit/YamlConfigurationTests.cpp | 327 +++++++++++---------- libminifi/src/core/yaml/YamlConfiguration.cpp | 10 +- 2 files changed, 186 insertions(+), 151 deletions(-) diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp index b4b8f6b..8121bda 100644 --- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp +++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp @@ -18,6 +18,7 @@ #include <map> #include <memory> +#include <chrono> #include "core/repository/VolatileContentRepository.h" #include "core/ProcessGroup.h" #include "core/RepositoryFactory.h" @@ -26,6 +27,8 @@ #include "TestBase.h" #include "utils/TestUtils.h" +using namespace std::chrono_literals; // NOLINT using namespace directive is required for literals + TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") { TestController test_controller; @@ -37,160 +40,184 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") { core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration); SECTION("loading YAML without optional component IDs works") { - static const std::string CONFIG_YAML_WITHOUT_IDS = "" - "MiNiFi Config Version: 1\n" - "Flow Controller:\n" - " name: MiNiFi Flow\n" - " comment:\n" - "\n" - "Core Properties:\n" - " flow controller graceful shutdown period: 10 sec\n" - " flow service write delay interval: 500 ms\n" - " administrative yield duration: 30 sec\n" - " bored yield duration: 10 millis\n" - "\n" - "FlowFile Repository:\n" - " partitions: 256\n" - " checkpoint interval: 2 mins\n" - " always sync: false\n" - " Swap:\n" - " threshold: 20000\n" - " in period: 5 sec\n" - " in threads: 1\n" - " out period: 5 sec\n" - " out threads: 4\n" - "\n" - "Provenance Repository:\n" - " provenance rollover time: 1 min\n" - "\n" - "Content Repository:\n" - " content claim max appendable size: 10 MB\n" - " content claim max flow files: 100\n" - " always sync: false\n" - "\n" - "Component Status Repository:\n" - " buffer size: 1440\n" - " snapshot frequency: 1 min\n" - "\n" - "Security Properties:\n" - " keystore: /tmp/ssl/localhost-ks.jks\n" - " keystore type: JKS\n" - " keystore password: localtest\n" - " key password: localtest\n" - " truststore: /tmp/ssl/localhost-ts.jks\n" - " truststore type: JKS\n" - " truststore password: localtest\n" - " ssl protocol: TLS\n" - " Sensitive Props:\n" - " key:\n" - " algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n" - " provider: BC\n" - "\n" - "Processors:\n" - " - name: TailFile\n" - " class: org.apache.nifi.processors.standard.TailFile\n" - " max concurrent tasks: 1\n" - " scheduling strategy: TIMER_DRIVEN\n" - " scheduling period: 1 sec\n" - " penalization period: 30 sec\n" - " yield period: 1 sec\n" - " run duration nanos: 0\n" - " auto-terminated relationships list:\n" - " Properties:\n" - " File to Tail: logs/minifi-app.log\n" - " Rolling Filename Pattern: minifi-app*\n" - " Initial Start Position: Beginning of File\n" - "\n" - "Connections:\n" - " - name: TailToS2S\n" - " source name: TailFile\n" - " source relationship name: success\n" - " destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n" - " max work queue size: 0\n" - " max work queue data size: 1 MB\n" - " flowfile expiration: 60 sec\n" - " queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n" - "\n" - "Remote Processing Groups:\n" - " - name: NiFi Flow\n" - " comment:\n" - " url: https://localhost:8090/nifi\n" - " timeout: 30 secs\n" - " yield period: 10 sec\n" - " Input Ports:\n" - " - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n" - " name: tailed log\n" - " comments:\n" - " max concurrent tasks: 1\n" - " use compression: false\n" - "\n" - "Provenance Reporting:\n" - " comment:\n" - " scheduling strategy: TIMER_DRIVEN\n" - " scheduling period: 30 sec\n" - " host: localhost\n" - " port name: provenance\n" - " port: 8090\n" - " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n" - " url: https://localhost:8090/\n" - " originating url: http://${hostname(true)}:8081/nifi\n" - " use compression: true\n" - " timeout: 30 secs\n" - " batch size: 1000"; - - std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS); - std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream); + static const std::string CONFIG_YAML_WITHOUT_IDS = + R"( +MiNiFi Config Version: 1 +Flow Controller: + name: MiNiFi Flow + comment: - REQUIRE(rootFlowConfig); - REQUIRE(rootFlowConfig->findProcessorByName("TailFile")); - utils::Identifier uuid = rootFlowConfig->findProcessorByName("TailFile")->getUUID(); - REQUIRE(uuid); - REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty()); - REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks()); - REQUIRE( - core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy()); - REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks()); - REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano()); - REQUIRE(std::chrono::seconds(30) == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); - REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec()); - REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano()); +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis - std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; - rootFlowConfig->getConnections(connectionMap); - REQUIRE(2 == connectionMap.size()); - // This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it - for (auto it : connectionMap) { - REQUIRE(it.second); - REQUIRE(!it.second->getUUIDStr().empty()); - REQUIRE(it.second->getDestination()); - REQUIRE(it.second->getSource()); - REQUIRE(60000 == it.second->getFlowExpirationDuration()); +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: + threshold: 20000 + in period: 5 sec + in threads: 1 + out period: 5 sec + out threads: 4 + +Provenance Repository: + provenance rollover time: 1 min + +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false + +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min + +Security Properties: + keystore: /tmp/ssl/localhost-ks.jks + keystore type: JKS + keystore password: localtest + key password: localtest + truststore: /tmp/ssl/localhost-ts.jks + truststore type: JKS + truststore password: localtest + ssl protocol: TLS + Sensitive Props: + key: + algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL + provider: BC + +Processors: + - name: TailFile + class: org.apache.nifi.processors.standard.TailFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + File to Tail: logs/minifi-app.log + Rolling Filename Pattern: minifi-app* + Initial Start Position: Beginning of File + +Connections: + - name: TailToS2S + source name: TailFile + source relationship name: success + destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer + +Remote Processing Groups: + - name: NiFi Flow + comment: + url: https://localhost:8090/nifi + timeout: 30 secs + yield period: 10 sec + Input Ports: + - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61 + name: tailed log + comments: + max concurrent tasks: 1 + use compression: false + +Provenance Reporting: + comment: + scheduling strategy: TIMER_DRIVEN + scheduling period: 30 sec + host: localhost + port name: provenance + port: 8090 + port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56 + url: https://localhost:8090/ + originating url: http://${hostname(true)}:8081/nifi + use compression: true + timeout: 30 secs + batch size: 1000; + )"; + + std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS); + std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream); + + REQUIRE(rootFlowConfig); + REQUIRE(rootFlowConfig->findProcessorByName("TailFile")); + utils::Identifier uuid = rootFlowConfig->findProcessorByName("TailFile")->getUUID(); + REQUIRE(uuid); + REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty()); + REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks()); + REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy()); + REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano()); + REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); + REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec()); + REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano()); + + std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; + rootFlowConfig->getConnections(connectionMap); + REQUIRE(2 == connectionMap.size()); + // This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it + for (auto it : connectionMap) { + REQUIRE(it.second); + REQUIRE(!it.second->getUUIDStr().empty()); + REQUIRE(it.second->getDestination()); + REQUIRE(it.second->getSource()); + REQUIRE(60000 == it.second->getFlowExpirationDuration()); + } } -} SECTION("missing required field in YAML throws exception") { - static const std::string CONFIG_YAML_NO_RPG_PORT_ID = "" - "MiNiFi Config Version: 1\n" - "Flow Controller:\n" - " name: MiNiFi Flow\n" - "Processors: []\n" - "Connections: []\n" - "Remote Processing Groups:\n" - " - name: NiFi Flow\n" - " comment:\n" - " url: https://localhost:8090/nifi\n" - " timeout: 30 secs\n" - " yield period: 10 sec\n" - " Input Ports:\n" - " - name: tailed log\n" - " comments:\n" - " max concurrent tasks: 1\n" - " use compression: false\n" - "\n"; - - std::istringstream configYamlStream(CONFIG_YAML_NO_RPG_PORT_ID); - REQUIRE_THROWS_AS(yamlConfig.getYamlRoot(configYamlStream), std::invalid_argument); -} + static const std::string CONFIG_YAML_NO_RPG_PORT_ID = + R"( +MiNiFi Config Version: 1 +Flow Controller: + name: MiNiFi Flow +Processors: [] +Connections: [] +Remote Processing Groups: + - name: NiFi Flow + comment: + url: https://localhost:8090/nifi + timeout: 30 secs + yield period: 10 sec + Input Ports: + - name: tailed log + comments: + max concurrent tasks: 1 + use compression: false + )"; + + std::istringstream configYamlStream(CONFIG_YAML_NO_RPG_PORT_ID); + REQUIRE_THROWS_AS(yamlConfig.getYamlRoot(configYamlStream), std::invalid_argument); + } + + SECTION("Validated YAML property failure throws exception and logs invalid attribute name") { + static const std::string CONFIG_YAML_EMPTY_RETRY_ATTRIBUTE = + R"( +Flow Controller: + name: MiNiFi Flow +Processors: + - name: RetryFlowFile + class: org.apache.nifi.processors.standard.RetryFlowFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + auto-terminated relationships list: + Properties: + Retry Attribute: "" +Connections: [] +Remote Processing Groups: [] +Provenance Reporting: + )"; + + std::istringstream configYamlStream(CONFIG_YAML_EMPTY_RETRY_ATTRIBUTE); + REQUIRE_THROWS_AS(yamlConfig.getYamlRoot(configYamlStream), utils::internal::InvalidValueException); + REQUIRE(LogTestController::getInstance().contains("Invalid value was set for property 'Retry Attribute' creating component 'RetryFlowFile'")); + } } TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") { @@ -452,7 +479,7 @@ NiFi Properties Overrides: {} REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy()); REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks()); REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano()); - REQUIRE(std::chrono::seconds(30) == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); + REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec()); REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano()); diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index d5d2e6e..1a4850b 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -764,8 +764,16 @@ void YamlConfiguration::parseSingleProperty(const std::string& propertyName, con core::Property myProp(propertyName, "", ""); processor->getProperty(propertyName, myProp); const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, propertyValueNode); + bool property_set = false; + try { + property_set = processor->setProperty(myProp, coercedValue); + } catch(const utils::internal::InvalidValueException&) { + auto component = std::dynamic_pointer_cast<core::CoreComponent>(processor); + logger_->log_error("Invalid value was set for property '%s' creating component '%s'", propertyName, component->getName()); + throw; + } const std::string rawValueString = propertyValueNode.as<std::string>(); - if (!processor->setProperty(myProp, coercedValue)) { + if (!property_set) { std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor); if (proc) { logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
