MINIFI-294 Required vs. optional fields in config YAML Long description of the changes in this commit can be found in the JIRA for MINFI-294. Basically, better checks are now in place and in the case of missing required fields, better error messaging. This builds upon improvements made in MINIFI-275, and also updates the config.yml exampe in the README.md file to match the code.
This closes #90. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/153b25b2 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/153b25b2 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/153b25b2 Branch: refs/heads/master Commit: 153b25b2ed9f42380548e261277a2c545278c8bc Parents: b26ac36 Author: Kevin Doran <[email protected]> Authored: Wed May 3 18:22:14 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Thu May 4 15:02:56 2017 -0400 ---------------------------------------------------------------------- README.md | 1 + libminifi/src/core/yaml/YamlConfiguration.cpp | 253 ++++++++++----------- 2 files changed, 123 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/153b25b2/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 7baefbc..ea73cd7 100644 --- a/README.md +++ b/README.md @@ -255,6 +255,7 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc Connections: - name: TransferFilesToRPG id: 471deef6-2a6e-4a7d-912a-81cc17e3a207 + source name: GetFile source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 source relationship name: success destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/153b25b2/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 0429f2c..489bdaa 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -69,13 +69,12 @@ void YamlConfiguration::parseProcessorNodeYaml( YAML::Node procNode = iter->as<YAML::Node>(); checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY); - checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY); - procCfg.name = procNode["name"].as<std::string>(); procCfg.id = getOrGenerateId(&procNode); uuid_parse(procCfg.id.c_str(), uuid); logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id); + checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY); procCfg.javaClass = procNode["class"].as<std::string>(); logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass); @@ -98,79 +97,73 @@ void YamlConfiguration::parseProcessorNodeYaml( } processor->setName(procCfg.name); - procCfg.maxConcurrentTasks = procNode["max concurrent tasks"] - .as<std::string>(); - logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", - procCfg.maxConcurrentTasks); - + checkRequiredField(&procNode, "scheduling strategy", CONFIG_YAML_PROCESSORS_KEY); procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>(); - logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", - procCfg.schedulingStrategy); + logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy); + checkRequiredField(&procNode, "scheduling period", CONFIG_YAML_PROCESSORS_KEY); procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>(); - logger_->log_debug("parseProcessorNode: scheduling period => [%s]", - procCfg.schedulingPeriod); + logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); - procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>(); - logger_->log_debug("parseProcessorNode: penalization period => [%s]", - procCfg.penalizationPeriod); + if (procNode["max concurrent tasks"]) { + procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>(); + logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks); + } - procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); - logger_->log_debug("parseProcessorNode: yield period => [%s]", - procCfg.yieldPeriod); + if (procNode["penalization period"]) { + procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>(); + logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod); + } + + if (procNode["yield period"]) { + procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); + logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod); + } - procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); - logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", - procCfg.runDurationNanos); + if (procNode["run duration nanos"]) { + procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); + logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos); + } // handle auto-terminated relationships - YAML::Node autoTerminatedSequence = - procNode["auto-terminated relationships list"]; - std::vector<std::string> rawAutoTerminatedRelationshipValues; - if (autoTerminatedSequence.IsSequence() - && !autoTerminatedSequence.IsNull() - && autoTerminatedSequence.size() > 0) { - for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); - relIter != autoTerminatedSequence.end(); ++relIter) { - std::string autoTerminatedRel = relIter->as<std::string>(); - rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); + if (procNode["auto-terminated relationships list"]) { + YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; + std::vector<std::string> rawAutoTerminatedRelationshipValues; + if (autoTerminatedSequence.IsSequence() + && !autoTerminatedSequence.IsNull() + && autoTerminatedSequence.size() > 0) { + for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); + relIter != autoTerminatedSequence.end(); ++relIter) { + std::string autoTerminatedRel = relIter->as<std::string>(); + rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); + } } + procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; } - procCfg.autoTerminatedRelationships = - rawAutoTerminatedRelationshipValues; // handle processor properties - YAML::Node propertiesNode = procNode["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); + if (procNode["Properties"]) { + YAML::Node propertiesNode = procNode["Properties"]; + parsePropertiesNodeYaml(&propertiesNode, processor); + } // Take care of scheduling core::TimeUnit unit; - if (core::Property::StringToTime(procCfg.schedulingPeriod, - schedulingPeriod, unit) - && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, - schedulingPeriod)) { - logger_->log_debug( - "convert: parseProcessorNode: schedulingPeriod => [%d] ns", - schedulingPeriod); + if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) + && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { + logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); processor->setSchedulingPeriodNano(schedulingPeriod); } - if (core::Property::StringToTime(procCfg.penalizationPeriod, - penalizationPeriod, unit) - && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, - penalizationPeriod)) { - logger_->log_debug( - "convert: parseProcessorNode: penalizationPeriod => [%d] ms", - penalizationPeriod); + if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) + && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { + logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod); processor->setPenalizationPeriodMsec(penalizationPeriod); } if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) - && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, - yieldPeriod)) { - logger_->log_debug( - "convert: parseProcessorNode: yieldPeriod => [%d] ms", - yieldPeriod); + && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { + logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); processor->setYieldPeriodMsec(yieldPeriod); } @@ -189,26 +182,20 @@ void YamlConfiguration::parseProcessorNodeYaml( } int64_t maxConcurrentTasks; - if (core::Property::StringToInt(procCfg.maxConcurrentTasks, - maxConcurrentTasks)) { - logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", - maxConcurrentTasks); + if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { + logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks); } - if (core::Property::StringToInt(procCfg.runDurationNanos, - runDurationNanos)) { - logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", - runDurationNanos); + if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { + logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); processor->setRunDurationNano((uint64_t) runDurationNanos); } std::set<core::Relationship> autoTerminatedRelationships; for (auto &&relString : procCfg.autoTerminatedRelationships) { core::Relationship relationship(relString, ""); - logger_->log_debug( - "parseProcessorNode: autoTerminatedRelationship => [%s]", - relString); + logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString); autoTerminatedRelationships.insert(relationship); } @@ -240,59 +227,58 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( ++iter) { YAML::Node currRpgNode = iter->as<YAML::Node>(); + checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); auto name = currRpgNode["name"].as<std::string>(); id = getOrGenerateId(&currRpgNode); - logger_->log_debug( - "parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id); + logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id); + checkRequiredField(&currRpgNode, "url", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); std::string url = currRpgNode["url"].as<std::string>(); logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url); - std::string timeout = currRpgNode["timeout"].as<std::string>(); - logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", - timeout); - - std::string yieldPeriod = currRpgNode["yield period"].as<std::string>(); - logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", - yieldPeriod); - - YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>(); - YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>(); core::ProcessGroup *group = NULL; - - uuid_parse(id.c_str(), uuid); - + core::TimeUnit unit; int64_t timeoutValue = -1; int64_t yieldPeriodValue = -1; - + uuid_parse(id.c_str(), uuid); group = this->createRemoteProcessGroup(name.c_str(), uuid).release(); group->setParent(parentGroup); parentGroup->addProcessGroup(group); - core::TimeUnit unit; - - if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) - && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, - yieldPeriodValue) && group) { - logger_->log_debug( - "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", - yieldPeriodValue); - group->setYieldPeriodMsec(yieldPeriodValue); + if (currRpgNode["yield period"]) { + std::string yieldPeriod = currRpgNode["yield period"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod); + + if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) + && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, + yieldPeriodValue) && group) { + logger_->log_debug( + "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", + yieldPeriodValue); + group->setYieldPeriodMsec(yieldPeriodValue); + } } - if (core::Property::StringToTime(timeout, timeoutValue, unit) - && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, - timeoutValue) && group) { - logger_->log_debug( - "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", - timeoutValue); - group->setTimeOut(timeoutValue); + if (currRpgNode["timeout"]) { + std::string timeout = currRpgNode["timeout"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout); + + if (core::Property::StringToTime(timeout, timeoutValue, unit) + && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, + timeoutValue) && group) { + logger_->log_debug( + "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", + timeoutValue); + group->setTimeOut(timeoutValue); + } } group->setTransmitting(true); group->setURL(url); + checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>(); if (inputPorts && inputPorts.IsSequence()) { for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { @@ -303,6 +289,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( this->parsePortYaml(&currPort, group, SEND); } // for node } + YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>(); if (outputPorts && outputPorts.IsSequence()) { for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) { @@ -408,6 +395,7 @@ void YamlConfiguration::parseConnectionYaml( // Configure basic connection uuid_t uuid; + checkRequiredField(&connectionNode, "name", CONFIG_YAML_CONNECTIONS_KEY); std::string name = connectionNode["name"].as<std::string>(); std::string id = getOrGenerateId(&connectionNode); uuid_parse(id.c_str(), uuid); @@ -417,44 +405,44 @@ void YamlConfiguration::parseConnectionYaml( // Configure connection source - auto rawRelationship = connectionNode["source relationship name"] - .as<std::string>(); + checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY); + auto rawRelationship = connectionNode["source relationship name"].as<std::string>(); core::Relationship relationship(rawRelationship, ""); - logger_->log_debug( - "parseConnection: relationship => [%s]", rawRelationship); + logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship); if (connection) { connection->setRelationship(relationship); } uuid_t srcUUID; - std::string connectionSrcProcName = connectionNode["source name"] - .as<std::string>(); - if (connectionNode["source id"]) { - std::string connectionSrcProcId = connectionNode["source id"] - .as<std::string>(); + if (connectionNode["source id"]) { + std::string connectionSrcProcId = connectionNode["source id"].as<std::string>(); uuid_parse(connectionSrcProcId.c_str(), srcUUID); + logger_->log_debug("Using 'source id' to match source with same id for " + "connection '%s': source id => [%s]", name, connectionSrcProcId); } else { - // if we don't have a source id, try harder to resolve the source processor. - // config schema v2 will make this unnecessary + // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary + checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY); + std::string connectionSrcProcName = connectionNode["source name"].as<std::string>(); uuid_t tmpUUID; - if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && - NULL != parent->findProcessor(tmpUUID)) { + if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) { // the source name is a remote port id, so use that as the source id uuid_copy(srcUUID, tmpUUID); + logger_->log_debug("Using 'source name' containing a remote port id to match the source for " + "connection '%s': source name => [%s]", name, connectionSrcProcName); } else { // lastly, look the processor up by name auto srcProcessor = parent->findProcessor(connectionSrcProcName); if (NULL != srcProcessor) { srcProcessor->getUUID(srcUUID); + logger_->log_debug("Using 'source name' to match source with same name for " + "connection '%s': source name => [%s]", name, connectionSrcProcName); } else { // we ran out of ways to discover the source processor logger_->log_error( - "Could not locate a source with name %s to create a connection", - connectionSrcProcName); + "Could not locate a source with name %s to create a connection", connectionSrcProcName); throw std::invalid_argument( - "Could not locate a source with name " + - connectionSrcProcName + " to create a connection "); + "Could not locate a source with name " + connectionSrcProcName + " to create a connection "); } } } @@ -462,33 +450,36 @@ void YamlConfiguration::parseConnectionYaml( // Configure connection destination uuid_t destUUID; - std::string connectionDestProcName = connectionNode["destination name"] - .as<std::string>(); if (connectionNode["destination id"]) { - std::string connectionDestProcId = connectionNode["destination id"] - .as<std::string>(); + std::string connectionDestProcId = connectionNode["destination id"].as<std::string>(); uuid_parse(connectionDestProcId.c_str(), destUUID); + logger_->log_debug("Using 'destination id' to match destination with same id for " + "connection '%s': destination id => [%s]", name, connectionDestProcId); } else { // we use the same logic as above for resolving the source processor // for looking up the destination processor in absence of a processor id + checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY); + std::string connectionDestProcName = connectionNode["destination name"].as<std::string>(); uuid_t tmpUUID; if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) { // the destination name is a remote port id, so use that as the dest id uuid_copy(destUUID, tmpUUID); + logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for " + "connection '%s': destination name => [%s]", name, connectionDestProcName); } else { // look the processor up by name auto destProcessor = parent->findProcessor(connectionDestProcName); if (NULL != destProcessor) { destProcessor->getUUID(destUUID); + logger_->log_debug("Using 'destination name' to match destination with same name for " + "connection '%s': destination name => [%s]", name, connectionDestProcName); } else { // we ran out of ways to discover the destination processor logger_->log_error( - "Could not locate a destination with name %s to create a connection", - connectionDestProcName); + "Could not locate a destination with name %s to create a connection", connectionDestProcName); throw std::invalid_argument( - "Could not locate a destination with name " + - connectionDestProcName + " to create a connection"); + "Could not locate a destination with name " + connectionDestProcName + " to create a connection"); } } } @@ -539,21 +530,21 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, // handle port properties YAML::Node nodeVal = portNode->as<YAML::Node>(); YAML::Node propertiesNode = nodeVal["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); // add processor to parent parent->addProcessor(processor); processor->setScheduledState(core::RUNNING); - auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"] - .as<std::string>(); - int64_t maxConcurrentTasks; - if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { + + if (inputPortsObj["max concurrent tasks"]) { + auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>(); + int64_t maxConcurrentTasks; + if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } + logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); processor->setMaxConcurrentTasks(maxConcurrentTasks); } - logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", - maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); } void YamlConfiguration::parsePropertiesNodeYaml( @@ -624,8 +615,8 @@ void YamlConfiguration::checkRequiredField( "' section of configuration file]"; } } - logger_->log_error(errorMessage.c_str()); - throw std::invalid_argument(errorMessage); + logger_->log_error(errMsg.c_str()); + throw std::invalid_argument(errMsg); } }
