adamdebreceni commented on code in PR #1391: URL: https://github.com/apache/nifi-minifi-cpp/pull/1391#discussion_r989811291
########## libminifi/src/core/flow/StructuredConfiguration.cpp: ########## @@ -0,0 +1,896 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <memory> +#include <vector> +#include <set> +#include <cinttypes> + +#include "core/flow/StructuredConfiguration.h" +#include "core/flow/CheckRequiredField.h" +#include "core/flow/StructuredConnectionParser.h" +#include "core/state/Value.h" +#include "Defaults.h" +#include "utils/TimeUtil.h" +#include "utils/RegexUtils.h" + +namespace org::apache::nifi::minifi::core::flow { + +std::shared_ptr<utils::IdGenerator> StructuredConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator(); + +StructuredConfiguration::StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger) + : FlowConfiguration(std::move(ctx)), + logger_(std::move(logger)) {} + +std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGroup(const Node& root_flow_node) { + auto flow_controller_node = root_flow_node[CONFIG_FLOW_CONTROLLER_KEY]; + auto root_group = parseProcessGroup(flow_controller_node, root_flow_node, true); + this->name_ = root_group->getName(); + return root_group; +} + +std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(const Node& node, bool is_root) { + int version = 0; + + checkRequiredField(node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY); + auto flowName = node["name"].getString().value(); + + utils::Identifier uuid; + // assignment throws on invalid uuid + uuid = getOrGenerateId(node); + + if (node["version"]) { + version = node["version"].getInt().value(); + } + + logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName); + std::unique_ptr<core::ProcessGroup> group; + if (is_root) { + group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version); + } else { + group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version); + } + + if (node["onschedule retry interval"]) { + auto onScheduleRetryPeriod = node["onschedule retry interval"].getString().value(); + logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod); + + auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod); + if (on_schedule_retry_period_value.has_value() && group) { + logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" PRId64 "] ms", on_schedule_retry_period_value->count()); + group->setOnScheduleRetryPeriod(on_schedule_retry_period_value->count()); + } + } + + return group; +} + +std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(const Node& headerNode, const Node& yamlNode, bool is_root) { + auto group = createProcessGroup(headerNode, is_root); + Node processorsNode = yamlNode[CONFIG_PROCESSORS_KEY]; + Node connectionsNode = yamlNode[StructuredConnectionParser::CONFIG_CONNECTIONS_KEY]; + Node funnelsNode = yamlNode[CONFIG_FUNNELS_KEY]; + Node remoteProcessingGroupsNode = [&] { + // assignment is not supported on invalid Yaml nodes + Node candidate = yamlNode[CONFIG_REMOTE_PROCESS_GROUP_KEY]; + if (candidate) { + return candidate; + } + return yamlNode[CONFIG_REMOTE_PROCESS_GROUP_KEY_V3]; + }(); + Node childProcessGroupNodeSeq = yamlNode["Process Groups"]; + + parseProcessorNode(processorsNode, group.get()); + parseRemoteProcessGroup(remoteProcessingGroupsNode, group.get()); + parseFunnels(funnelsNode, group.get()); + // parse connections last to give feedback if the source and/or destination + // is not in the same process group + parseConnection(connectionsNode, group.get()); + + if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.isSequence()) { + for (const auto childProcessGroupNode : childProcessGroupNodeSeq) { + group->addProcessGroup(parseProcessGroup(childProcessGroupNode, childProcessGroupNode)); + } + } + return group; +} + +std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& rootYamlNode) { + uuids_.clear(); + Node controllerServiceNode = rootYamlNode[CONFIG_CONTROLLER_SERVICES_KEY]; + Node provenanceReportNode = rootYamlNode[CONFIG_PROVENANCE_REPORT_KEY]; + + parseControllerServices(controllerServiceNode); + // Create the root process group + std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroup(rootYamlNode); + parseProvenanceReporting(provenanceReportNode, root.get()); + + // set the controller services into the root group. + for (const auto& controller_service : controller_services_->getAllControllerServices()) { + root->addControllerService(controller_service->getName(), controller_service); + root->addControllerService(controller_service->getUUIDStr(), controller_service); + } + + return root; +} + +void StructuredConfiguration::parseProcessorNode(const Node& processors_node, core::ProcessGroup* parentGroup) { + int64_t runDurationNanos = -1; + utils::Identifier uuid; + std::unique_ptr<core::Processor> processor; + + if (!parentGroup) { + logger_->log_error("parseProcessNodeYaml: no parent group exists"); + return; + } + + if (!processors_node) { + throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); + } + if (!processors_node.isSequence()) { + throw std::invalid_argument( + "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); + } + // Evaluate sequence of processors + for (const auto procNode : processors_node) { + core::ProcessorConfig procCfg; + + checkRequiredField(procNode, "name", CONFIG_PROCESSORS_KEY); + procCfg.name = procNode["name"].getString().value(); + procCfg.id = getOrGenerateId(procNode); + + uuid = procCfg.id; + logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id); + checkRequiredField(procNode, "class", CONFIG_PROCESSORS_KEY); + procCfg.javaClass = procNode["class"].getString().value(); + logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass); + + // Determine the processor name only from the Java class + auto lastOfIdx = procCfg.javaClass.find_last_of('.'); + if (lastOfIdx != std::string::npos) { + lastOfIdx++; // if a value is found, increment to move beyond the . + std::string processorName = procCfg.javaClass.substr(lastOfIdx); + processor = this->createProcessor(processorName, procCfg.javaClass, uuid); + } else { + // Allow unqualified class names for core processors + processor = this->createProcessor(procCfg.javaClass, uuid); + } + + if (!processor) { + logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id); + throw std::invalid_argument("Could not create processor " + procCfg.name); + } + + processor->setName(procCfg.name); + + processor->setFlowIdentifier(flow_version_->getFlowIdentifier()); + + procCfg.schedulingStrategy = getOptionalField(procNode, "scheduling strategy", DEFAULT_SCHEDULING_STRATEGY, CONFIG_PROCESSORS_KEY); + logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy); + + procCfg.schedulingPeriod = getOptionalField(procNode, "scheduling period", DEFAULT_SCHEDULING_PERIOD_STR, CONFIG_PROCESSORS_KEY); + + logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); + + if (auto tasksNode = procNode["max concurrent tasks"]) { + if (auto int_val = tasksNode.getUInt64()) { + procCfg.maxConcurrentTasks = std::to_string(int_val.value()); + } else { + procCfg.maxConcurrentTasks = tasksNode.getString().value(); + } + logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks); + } + + if (procNode["penalization period"]) { + procCfg.penalizationPeriod = procNode["penalization period"].getString().value(); + logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod); + } + + if (procNode["yield period"]) { + procCfg.yieldPeriod = procNode["yield period"].getString().value(); + logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod); + } + + if (auto runNode = procNode["run duration nanos"]) { + if (auto int_val = runNode.getUInt64()) { + procCfg.runDurationNanos = std::to_string(int_val.value()); + } else { + procCfg.runDurationNanos = runNode.getString().value(); + } + logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos); + } + + // handle auto-terminated relationships + if (procNode["auto-terminated relationships list"]) { + Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; + std::vector<std::string> rawAutoTerminatedRelationshipValues; + if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) { + for (const auto autoTerminatedRel : autoTerminatedSequence) { + rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel.getString().value()); + } + } + procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; + } + + // handle processor properties + if (procNode["Properties"]) { + Node propertiesNode = procNode["Properties"]; + parsePropertiesNode(propertiesNode, *processor, procCfg.name, CONFIG_PROCESSORS_KEY); + } + + // Take care of scheduling + + if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") { + if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) { + logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", scheduling_period->count()); + processor->setSchedulingPeriodNano(*scheduling_period); + } + } else { + processor->setCronPeriod(procCfg.schedulingPeriod); + } + + if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) { + logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalization_period->count()); + processor->setPenalizationPeriod(penalization_period.value()); + } + + if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) { + logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yield_period->count()); + processor->setYieldPeriodMsec(yield_period.value()); + } + + // Default to running + processor->setScheduledState(core::RUNNING); + + if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { + processor->setSchedulingStrategy(core::TIMER_DRIVEN); + logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); + } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { + processor->setSchedulingStrategy(core::EVENT_DRIVEN); + logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); + } else { + processor->setSchedulingStrategy(core::CRON_DRIVEN); + logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); + } + + int32_t maxConcurrentTasks; + if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { + logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); + processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks); + } + + if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { + logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); + processor->setRunDurationNano(std::chrono::nanoseconds(runDurationNanos)); + } + + std::vector<core::Relationship> autoTerminatedRelationships; + for (auto &&relString : procCfg.autoTerminatedRelationships) { + core::Relationship relationship(relString, ""); + logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString); + autoTerminatedRelationships.push_back(relationship); + } + + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + parentGroup->addProcessor(std::move(processor)); + } +} + +void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parentGroup) { + utils::Identifier uuid; + std::string id; + + if (!parentGroup) { + logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists"); + return; + } + + if (!rpg_node_seq || !rpg_node_seq.isSequence()) { + return; + } + for (const auto currRpgNode : rpg_node_seq) { + checkRequiredField(currRpgNode, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY); + auto name = currRpgNode["name"].getString().value(); + id = getOrGenerateId(currRpgNode); + + logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id); + + auto url = getOptionalField(currRpgNode, "url", "", CONFIG_REMOTE_PROCESS_GROUP_KEY); + + logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url); + + uuid = id; + auto group = createRemoteProcessGroup(name, uuid); + group->setParent(parentGroup); + + if (currRpgNode["yield period"]) { + auto yieldPeriod = currRpgNode["yield period"].getString().value(); + logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod); + + auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod); + if (yield_period_value.has_value() && group) { + logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count()); + group->setYieldPeriodMsec(*yield_period_value); + } + } + + if (currRpgNode["timeout"]) { + auto timeout = currRpgNode["timeout"].getString().value(); + logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout); + + auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout); + if (timeout_value.has_value() && group) { + logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeout_value->count()); + group->setTimeout(timeout_value->count()); + } + } + + if (currRpgNode["local network interface"]) { + auto interface = currRpgNode["local network interface"].getString().value(); + logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface); + group->setInterface(interface); + } + + if (currRpgNode["transport protocol"]) { + auto transport_protocol = currRpgNode["transport protocol"].getString().value(); + logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol); + if (transport_protocol == "HTTP") { + group->setTransportProtocol(transport_protocol); + if (currRpgNode["proxy host"]) { + auto http_proxy_host = currRpgNode["proxy host"].getString().value(); + logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host); + group->setHttpProxyHost(http_proxy_host); + if (currRpgNode["proxy user"]) { + auto http_proxy_username = currRpgNode["proxy user"].getString().value(); + logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username); + group->setHttpProxyUserName(http_proxy_username); + } + if (currRpgNode["proxy password"]) { + auto http_proxy_password = currRpgNode["proxy password"].getString().value(); + logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password); + group->setHttpProxyPassWord(http_proxy_password); + } + if (currRpgNode["proxy port"]) { + auto http_proxy_port = currRpgNode["proxy port"].getString().value(); + int32_t port; + if (core::Property::StringToInt(http_proxy_port, port)) { + logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port); + group->setHttpProxyPort(port); + } + } + } + } else if (transport_protocol == "RAW") { + group->setTransportProtocol(transport_protocol); + } else { + std::stringstream stream; + stream << "Invalid transport protocol " << transport_protocol; + throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str()); + } + } + + group->setTransmitting(true); + group->setURL(url); + + checkRequiredField(currRpgNode, "Input Ports", CONFIG_REMOTE_PROCESS_GROUP_KEY); + auto inputPorts = currRpgNode["Input Ports"]; + if (inputPorts && inputPorts.isSequence()) { + for (const auto currPort : inputPorts) { + parsePort(currPort, group.get(), sitetosite::SEND); + } // for node + } + auto outputPorts = currRpgNode["Output Ports"]; + if (outputPorts && outputPorts.isSequence()) { + for (const auto currPort : outputPorts) { + logger_->log_debug("Got a current port, iterating..."); + + parsePort(currPort, group.get(), sitetosite::RECEIVE); + } // for node + } + parentGroup->addProcessGroup(std::move(group)); + } +} + +void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::ProcessGroup* parent_group) { + utils::Identifier port_uuid; + + if (!parent_group) { + logger_->log_error("parseProvenanceReportingYaml: no parent group exists"); + return; + } + + if (!node || node.isNull()) { + logger_->log_debug("no provenance reporting task specified"); + return; + } + + auto reportTask = createProvenanceReportTask(); + + checkRequiredField(node, "scheduling strategy", CONFIG_PROVENANCE_REPORT_KEY); + auto schedulingStrategyStr = node["scheduling strategy"].getString().value(); + checkRequiredField(node, "scheduling period", CONFIG_PROVENANCE_REPORT_KEY); + auto schedulingPeriodStr = node["scheduling period"].getString().value(); + + if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) { + logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count()); + reportTask->setSchedulingPeriodNano(*scheduling_period); + } + + if (schedulingStrategyStr == "TIMER_DRIVEN") { + reportTask->setSchedulingStrategy(core::TIMER_DRIVEN); + logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr); + } else { + throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr); + } + + int64_t lvalue; + if (node["host"] && node["port"]) { + auto hostStr = node["host"].getString().value(); + + std::string portStr; + if (auto int_val = node["port"].getInt()) { + portStr = std::to_string(int_val.value()); + } else { + portStr = node["port"].getString().value(); + } Review Comment: good idea, added `getIntegerAsString` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
