http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 7bf4f58..a11db2b 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -29,8 +29,7 @@ namespace nifi { namespace minifi { namespace core { -core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml( - YAML::Node rootFlowNode) { +core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { uuid_t uuid; checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); @@ -38,18 +37,15 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml( std::string id = getOrGenerateId(&rootFlowNode); uuid_parse(id.c_str(), uuid); - logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, - flowName); - std::unique_ptr<core::ProcessGroup> group = - FlowConfiguration::createRootProcessGroup(flowName, uuid); + logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); + std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid); this->name_ = flowName; return group.release(); } -void YamlConfiguration::parseProcessorNodeYaml( - YAML::Node processorsNode, core::ProcessGroup * parentGroup) { +void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::ProcessGroup * parentGroup) { int64_t schedulingPeriod = -1; int64_t penalizationPeriod = -1; int64_t yieldPeriod = -1; @@ -65,8 +61,7 @@ void YamlConfiguration::parseProcessorNodeYaml( if (processorsNode) { if (processorsNode.IsSequence()) { // Evaluate sequence of processors - for (YAML::const_iterator iter = processorsNode.begin(); - iter != processorsNode.end(); ++iter) { + for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) { core::ProcessorConfig procCfg; YAML::Node procNode = iter->as<YAML::Node>(); @@ -74,28 +69,23 @@ void YamlConfiguration::parseProcessorNodeYaml( procCfg.name = procNode["name"].as<std::string>(); procCfg.id = getOrGenerateId(&procNode); uuid_parse(procCfg.id.c_str(), uuid); - logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", - procCfg.name, procCfg.id); + logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id); checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY); procCfg.javaClass = procNode["class"].as<std::string>(); - logger_->log_debug("parseProcessorNode: class => [%s]", - procCfg.javaClass); + logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass); // Determine the processor name only from the Java class int lastOfIdx = procCfg.javaClass.find_last_of("."); if (lastOfIdx != std::string::npos) { lastOfIdx++; // if a value is found, increment to move beyond the . int nameLength = procCfg.javaClass.length() - lastOfIdx; - std::string processorName = procCfg.javaClass.substr(lastOfIdx, - nameLength); + std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength); processor = this->createProcessor(processorName, uuid); } if (!processor) { - logger_->log_error("Could not create a processor %s with id %s", - procCfg.name, procCfg.id); - throw std::invalid_argument( - "Could not create processor " + procCfg.name); + logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id); + throw std::invalid_argument("Could not create processor " + procCfg.name); } processor->setName(procCfg.name); @@ -131,11 +121,8 @@ void YamlConfiguration::parseProcessorNodeYaml( if (procNode["auto-terminated relationships list"]) { YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; std::vector<std::string> rawAutoTerminatedRelationshipValues; - if (autoTerminatedSequence.IsSequence() - && !autoTerminatedSequence.IsNull() - && autoTerminatedSequence.size() > 0) { - for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); - relIter != autoTerminatedSequence.end(); ++relIter) { + if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) { + for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) { std::string autoTerminatedRel = relIter->as<std::string>(); rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); } @@ -151,20 +138,17 @@ void YamlConfiguration::parseProcessorNodeYaml( // Take care of scheduling core::TimeUnit unit; - if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) - && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { + if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); processor->setSchedulingPeriodNano(schedulingPeriod); } - if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) - && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { + if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod); processor->setPenalizationPeriodMsec(penalizationPeriod); } - if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) - && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { + if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); processor->setYieldPeriodMsec(yieldPeriod); } @@ -174,16 +158,13 @@ void YamlConfiguration::parseProcessorNodeYaml( if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { processor->setSchedulingStrategy(core::TIMER_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", - procCfg.schedulingStrategy); + logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { processor->setSchedulingStrategy(core::EVENT_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", - procCfg.schedulingStrategy); + logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); } else { processor->setSchedulingStrategy(core::CRON_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", - procCfg.schedulingStrategy); + logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); } int64_t maxConcurrentTasks; @@ -209,18 +190,15 @@ void YamlConfiguration::parseProcessorNodeYaml( parentGroup->addProcessor(processor); } } else { - throw new std::invalid_argument( - "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); + throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); } } else { - throw new std::invalid_argument( - "Cannot instantiate a MiNiFi instance without a defined " - "Processors configuration node."); + throw new std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined " + "Processors configuration node."); } } -void YamlConfiguration::parseRemoteProcessGroupYaml( - YAML::Node *rpgNode, core::ProcessGroup * parentGroup) { +void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup * parentGroup) { uuid_t uuid; std::string id; @@ -231,8 +209,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( if (rpgNode) { if (rpgNode->IsSequence()) { - for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); - ++iter) { + for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) { YAML::Node currRpgNode = iter->as<YAML::Node>(); checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); @@ -258,12 +235,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( std::string yieldPeriod = currRpgNode["yield period"].as<std::string>(); logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod); - if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) - && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, - yieldPeriodValue) && group) { - logger_->log_debug( - "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", - yieldPeriodValue); + if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) { + logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue); group->setYieldPeriodMsec(yieldPeriodValue); } } @@ -272,12 +245,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( std::string timeout = currRpgNode["timeout"].as<std::string>(); logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout); - if (core::Property::StringToTime(timeout, timeoutValue, unit) - && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, - timeoutValue) && group) { - logger_->log_debug( - "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", - timeoutValue); + if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) { + logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue); group->setTimeOut(timeoutValue); } } @@ -288,8 +257,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>(); if (inputPorts && inputPorts.IsSequence()) { - for (YAML::const_iterator portIter = inputPorts.begin(); - portIter != inputPorts.end(); ++portIter) { + for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { logger_->log_debug("Got a current port, iterating..."); YAML::Node currPort = portIter->as<YAML::Node>(); @@ -299,8 +267,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( } YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>(); if (outputPorts && outputPorts.IsSequence()) { - for (YAML::const_iterator portIter = outputPorts.begin(); - portIter != outputPorts.end(); ++portIter) { + for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) { logger_->log_debug("Got a current port, iterating..."); YAML::Node currPort = portIter->as<YAML::Node>(); @@ -313,8 +280,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( } } -void YamlConfiguration::parseProvenanceReportingYaml( - YAML::Node *reportNode, core::ProcessGroup * parentGroup) { +void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup * parentGroup) { uuid_t port_uuid; int64_t schedulingPeriod = -1; @@ -330,9 +296,7 @@ void YamlConfiguration::parseProvenanceReportingYaml( std::shared_ptr<core::Processor> processor = nullptr; processor = createProvenanceReportTask(); - std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = - std::static_pointer_cast< - core::reporting::SiteToSiteProvenanceReportingTask>(processor); + std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = std::static_pointer_cast<core::reporting::SiteToSiteProvenanceReportingTask>(processor); YAML::Node node = reportNode->as<YAML::Node>(); @@ -354,21 +318,16 @@ void YamlConfiguration::parseProvenanceReportingYaml( processor->setScheduledState(core::RUNNING); core::TimeUnit unit; - if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) - && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, - schedulingPeriod)) { - logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns", - schedulingPeriod); + if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { + logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns", schedulingPeriod); processor->setSchedulingPeriodNano(schedulingPeriod); } if (schedulingStrategyStr == "TIMER_DRIVEN") { processor->setSchedulingStrategy(core::TIMER_DRIVEN); - logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", - schedulingStrategyStr); + logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr); } else { - throw std::invalid_argument( - "Invalid scheduling strategy " + schedulingStrategyStr); + throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr); } reportTask->setHost(hostStr); @@ -387,19 +346,18 @@ void YamlConfiguration::parseProvenanceReportingYaml( } } -void YamlConfiguration::parseControllerServices( - YAML::Node *controllerServicesNode) { +void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNode) { if (!IsNullOrEmpty(controllerServicesNode)) { if (controllerServicesNode->IsSequence()) { for (auto iter : *controllerServicesNode) { YAML::Node controllerServiceNode = iter.as<YAML::Node>(); try { checkRequiredField(&controllerServiceNode, "name", - CONFIG_YAML_CONTROLLER_SERVICES_KEY); + CONFIG_YAML_CONTROLLER_SERVICES_KEY); checkRequiredField(&controllerServiceNode, "id", - CONFIG_YAML_CONTROLLER_SERVICES_KEY); + CONFIG_YAML_CONTROLLER_SERVICES_KEY); checkRequiredField(&controllerServiceNode, "class", - CONFIG_YAML_CONTROLLER_SERVICES_KEY); + CONFIG_YAML_CONTROLLER_SERVICES_KEY); auto name = controllerServiceNode["name"].as<std::string>(); auto id = controllerServiceNode["id"].as<std::string>(); @@ -407,42 +365,28 @@ void YamlConfiguration::parseControllerServices( uuid_t uuid; uuid_parse(id.c_str(), uuid); - auto controller_service_node = createControllerService(type, name, - uuid); + auto controller_service_node = createControllerService(type, name, uuid); if (nullptr != controller_service_node) { - logger_->log_debug( - "Created Controller Service with UUID %s and name %s", id, - name); + logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name); controller_service_node->initialize(); YAML::Node propertiesNode = controllerServiceNode["Properties"]; // we should propogate propertiets to the node and to the implementation - parsePropertiesNodeYaml( - &propertiesNode, - std::static_pointer_cast<core::ConfigurableComponent>( - controller_service_node)); - if (controller_service_node->getControllerServiceImplementation() - != nullptr) { - parsePropertiesNodeYaml( - &propertiesNode, - std::static_pointer_cast<core::ConfigurableComponent>( - controller_service_node - ->getControllerServiceImplementation())); + parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node)); + if (controller_service_node->getControllerServiceImplementation() != nullptr) { + parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(controller_service_node->getControllerServiceImplementation())); } } controller_services_->put(id, controller_service_node); controller_services_->put(name, controller_service_node); } catch (YAML::InvalidNode &in) { - throw Exception( - ExceptionType::GENERAL_EXCEPTION, - "Name, id, and class must be specified for controller services"); + throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services"); } } } } } -void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, - core::ProcessGroup *parent) { +void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::ProcessGroup *parent) { if (!parent) { logger_->log_error("parseProcessNode: no parent group was provided"); return; @@ -450,8 +394,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, if (connectionsNode) { if (connectionsNode->IsSequence()) { - for (YAML::const_iterator iter = connectionsNode->begin(); - iter != connectionsNode->end(); ++iter) { + for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) { YAML::Node connectionNode = iter->as<YAML::Node>(); std::shared_ptr<minifi::Connection> connection = nullptr; @@ -462,15 +405,13 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, std::string id = getOrGenerateId(&connectionNode); uuid_parse(id.c_str(), uuid); connection = this->createConnection(name, uuid); - logger_->log_debug("Created connection with UUID %s and name %s", id, - name); + logger_->log_debug("Created connection with UUID %s and name %s", id, name); // Configure connection source checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY); auto rawRelationship = connectionNode["source relationship name"].as<std::string>(); core::Relationship relationship(rawRelationship, ""); - logger_->log_debug("parseConnection: relationship => [%s]", - rawRelationship); + logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship); if (connection) { connection->setRelationship(relationship); } @@ -481,7 +422,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, std::string connectionSrcProcId = connectionNode["source id"].as<std::string>(); uuid_parse(connectionSrcProcId.c_str(), srcUUID); logger_->log_debug("Using 'source id' to match source with same id for " - "connection '%s': source id => [%s]", name, connectionSrcProcId); + "connection '%s': source id => [%s]", + name, connectionSrcProcId); } else { // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY); @@ -491,20 +433,20 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, // the source name is a remote port id, so use that as the source id uuid_copy(srcUUID, tmpUUID); logger_->log_debug("Using 'source name' containing a remote port id to match the source for " - "connection '%s': source name => [%s]", name, connectionSrcProcName); + "connection '%s': source name => [%s]", + name, connectionSrcProcName); } else { // lastly, look the processor up by name auto srcProcessor = parent->findProcessor(connectionSrcProcName); if (NULL != srcProcessor) { srcProcessor->getUUID(srcUUID); logger_->log_debug("Using 'source name' to match source with same name for " - "connection '%s': source name => [%s]", name, connectionSrcProcName); + "connection '%s': source name => [%s]", + name, connectionSrcProcName); } else { // we ran out of ways to discover the source processor - logger_->log_error( - "Could not locate a source with name %s to create a connection", connectionSrcProcName); - throw std::invalid_argument( - "Could not locate a source with name " + connectionSrcProcName + " to create a connection "); + logger_->log_error("Could not locate a source with name %s to create a connection", connectionSrcProcName); + throw std::invalid_argument("Could not locate a source with name " + connectionSrcProcName + " to create a connection "); } } } @@ -516,7 +458,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, std::string connectionDestProcId = connectionNode["destination id"].as<std::string>(); uuid_parse(connectionDestProcId.c_str(), destUUID); logger_->log_debug("Using 'destination id' to match destination with same id for " - "connection '%s': destination id => [%s]", name, connectionDestProcId); + "connection '%s': destination id => [%s]", + name, connectionDestProcId); } else { // we use the same logic as above for resolving the source processor // for looking up the destination processor in absence of a processor id @@ -524,24 +467,24 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, std::string connectionDestProcName = connectionNode["destination name"].as<std::string>(); uuid_t tmpUUID; if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) && - NULL != parent->findProcessor(tmpUUID)) { + NULL != parent->findProcessor(tmpUUID)) { // the destination name is a remote port id, so use that as the dest id uuid_copy(destUUID, tmpUUID); logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for " - "connection '%s': destination name => [%s]", name, connectionDestProcName); + "connection '%s': destination name => [%s]", + name, connectionDestProcName); } else { // look the processor up by name auto destProcessor = parent->findProcessor(connectionDestProcName); if (NULL != destProcessor) { destProcessor->getUUID(destUUID); logger_->log_debug("Using 'destination name' to match destination with same name for " - "connection '%s': destination name => [%s]", name, connectionDestProcName); + "connection '%s': destination name => [%s]", + name, connectionDestProcName); } else { // we ran out of ways to discover the destination processor - logger_->log_error( - "Could not locate a destination with name %s to create a connection", connectionDestProcName); - throw std::invalid_argument( - "Could not locate a destination with name " + connectionDestProcName + " to create a connection"); + logger_->log_error("Could not locate a destination with name %s to create a connection", connectionDestProcName); + throw std::invalid_argument("Could not locate a destination with name " + connectionDestProcName + " to create a connection"); } } } @@ -555,9 +498,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, } } -void YamlConfiguration::parsePortYaml(YAML::Node *portNode, - core::ProcessGroup *parent, - TransferDirection direction) { +void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, TransferDirection direction) { uuid_t uuid; std::shared_ptr<core::Processor> processor = NULL; std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL; @@ -572,16 +513,13 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, // Check for required fields checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); auto nameStr = inputPortsObj["name"].as<std::string>(); - checkRequiredField( - &inputPortsObj, - "id", - CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY, - "The field 'id' is required for " - "the port named '" + nameStr - + "' in the YAML Config. If this port " - "is an input port for a NiFi Remote Process Group, the port " - "id should match the corresponding id specified in the NiFi configuration. " - "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX."); + checkRequiredField(&inputPortsObj, "id", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY, + "The field 'id' is required for " + "the port named '" + nameStr + "' in the YAML Config. If this port " + "is an input port for a NiFi Remote Process Group, the port " + "id should match the corresponding id specified in the NiFi configuration. " + "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX."); auto portId = inputPortsObj["id"].as<std::string>(); uuid_parse(portId.c_str(), uuid); @@ -597,9 +535,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, // handle port properties YAML::Node nodeVal = portNode->as<YAML::Node>(); YAML::Node propertiesNode = nodeVal["Properties"]; - parsePropertiesNodeYaml( - &propertiesNode, - std::static_pointer_cast<core::ConfigurableComponent>(processor)); + parsePropertiesNodeYaml(&propertiesNode, std::static_pointer_cast<core::ConfigurableComponent>(processor)); // add processor to parent parent->addProcessor(processor); @@ -616,12 +552,9 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, } } -void YamlConfiguration::parsePropertiesNodeYaml( - YAML::Node *propertiesNode, - std::shared_ptr<core::ConfigurableComponent> processor) { +void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::ConfigurableComponent> processor) { // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated - for (YAML::const_iterator propsIter = propertiesNode->begin(); - propsIter != propertiesNode->end(); ++propsIter) { + for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) { std::string propertyName = propsIter->first.as<std::string>(); YAML::Node propertyValueNode = propsIter->second; if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) { @@ -634,12 +567,9 @@ void YamlConfiguration::parsePropertiesNodeYaml( std::string rawValueString = propertiesNode.as<std::string>(); logger_->log_info("Found %s=%s", propertyName, rawValueString); if (!processor->updateProperty(propertyName, rawValueString)) { - std::shared_ptr<core::Connectable> proc = - std::dynamic_pointer_cast<core::Connectable>(processor); + std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor); if (proc != 0) { - logger_->log_warn( - "Received property %s with value %s but is not one of the properties for %s", - propertyName, rawValueString, proc->getName()); + logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName()); } } } @@ -647,12 +577,9 @@ void YamlConfiguration::parsePropertiesNodeYaml( } else { std::string rawValueString = propertyValueNode.as<std::string>(); if (!processor->setProperty(propertyName, rawValueString)) { - std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast< - core::Connectable>(processor); + std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor); if (proc != 0) { - logger_->log_warn( - "Received property %s with value %s but is not one of the properties for %s", - propertyName, rawValueString, proc->getName()); + logger_->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName, rawValueString, proc->getName()); } } } @@ -660,8 +587,7 @@ void YamlConfiguration::parsePropertiesNodeYaml( } } -std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, - const std::string &idField) { +std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std::string &idField) { std::string id; YAML::Node node = yamlNode->as<YAML::Node>(); @@ -669,9 +595,8 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, if (YAML::NodeType::Scalar == node[idField].Type()) { id = node[idField].as<std::string>(); } else { - throw std::invalid_argument( - "getOrGenerateId: idField is expected to reference YAML::Node " - "of YAML::NodeType::Scalar."); + throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node " + "of YAML::NodeType::Scalar."); } } else { uuid_t uuid; @@ -684,10 +609,7 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, return id; } -void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, - const std::string &fieldName, - const std::string &yamlSection, - const std::string &errorMessage) { +void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName, const std::string &yamlSection, const std::string &errorMessage) { std::string errMsg = errorMessage; if (!yamlNode->as<YAML::Node>()[fieldName]) { if (errMsg.empty()) { @@ -695,11 +617,8 @@ void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, // invalid YAML config file, using the component name if present errMsg = yamlNode->as<YAML::Node>()["name"] ? - "Unable to parse configuration file for component named '" - + yamlNode->as<YAML::Node>()["name"].as<std::string>() - + "' as required field '" + fieldName + "' is missing" : - "Unable to parse configuration file as required field '" - + fieldName + "' is missing"; + "Unable to parse configuration file for component named '" + yamlNode->as<YAML::Node>()["name"].as<std::string>() + "' as required field '" + fieldName + "' is missing" : + "Unable to parse configuration file as required field '" + fieldName + "' is missing"; if (!yamlSection.empty()) { errMsg += " [in '" + yamlSection + "' section of configuration file]"; }
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/BaseStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp index 307f41d..137bf1a 100644 --- a/libminifi/src/io/BaseStream.cpp +++ b/libminifi/src/io/BaseStream.cpp @@ -33,9 +33,7 @@ namespace io { * @return resulting write size **/ int BaseStream::write(uint32_t base_value, bool is_little_endian) { - return Serializable::write(base_value, - reinterpret_cast<DataStream*>(composable_stream_), - is_little_endian); + return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); } int BaseStream::writeData(uint8_t *value, int size) { @@ -54,9 +52,7 @@ int BaseStream::writeData(uint8_t *value, int size) { * @return resulting write size **/ int BaseStream::write(uint16_t base_value, bool is_little_endian) { - return Serializable::write(base_value, - reinterpret_cast<DataStream*>(composable_stream_), - is_little_endian); + return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); } /** @@ -67,8 +63,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) { * @return resulting write size **/ int BaseStream::write(uint8_t *value, int len) { - return Serializable::write(value, len, - reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::write(value, len, reinterpret_cast<DataStream*>(composable_stream_)); } /** @@ -79,9 +74,7 @@ int BaseStream::write(uint8_t *value, int len) { * @return resulting write size **/ int BaseStream::write(uint64_t base_value, bool is_little_endian) { - return Serializable::write(base_value, - reinterpret_cast<DataStream*>(composable_stream_), - is_little_endian); + return Serializable::write(base_value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); } /** @@ -100,8 +93,7 @@ int BaseStream::write(bool value) { * @return resulting write size **/ int BaseStream::writeUTF(std::string str, bool widen) { - return Serializable::writeUTF( - str, reinterpret_cast<DataStream*>(composable_stream_), widen); + return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen); } /** @@ -111,8 +103,7 @@ int BaseStream::writeUTF(std::string str, bool widen) { * @return resulting read size **/ int BaseStream::read(uint8_t &value) { - return Serializable::read(value, - reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_)); } /** @@ -122,8 +113,7 @@ int BaseStream::read(uint8_t &value) { * @return resulting read size **/ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { - return Serializable::read(base_value, - reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(base_value, reinterpret_cast<DataStream*>(composable_stream_)); } /** @@ -133,8 +123,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(char &value) { - return Serializable::read(value, - reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_)); } /** @@ -145,8 +134,7 @@ int BaseStream::read(char &value) { * @return resulting read size **/ int BaseStream::read(uint8_t *value, int len) { - return Serializable::read(value, len, - reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(value, len, reinterpret_cast<DataStream*>(composable_stream_)); } /** @@ -155,8 +143,7 @@ int BaseStream::read(uint8_t *value, int len) { * @param buflen */ int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) { - return Serializable::read(&buf[0], buflen, - reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(&buf[0], buflen, reinterpret_cast<DataStream*>(composable_stream_)); } /** * Reads data and places it into buf @@ -164,8 +151,7 @@ int BaseStream::readData(std::vector<uint8_t> &buf, int buflen) { * @param buflen */ int BaseStream::readData(uint8_t *buf, int buflen) { - return Serializable::read(buf, buflen, - reinterpret_cast<DataStream*>(composable_stream_)); + return Serializable::read(buf, buflen, reinterpret_cast<DataStream*>(composable_stream_)); } /** @@ -175,9 +161,7 @@ int BaseStream::readData(uint8_t *buf, int buflen) { * @return resulting read size **/ int BaseStream::read(uint32_t &value, bool is_little_endian) { - return Serializable::read(value, - reinterpret_cast<DataStream*>(composable_stream_), - is_little_endian); + return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); } /** @@ -187,9 +171,7 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(uint64_t &value, bool is_little_endian) { - return Serializable::read(value, - reinterpret_cast<DataStream*>(composable_stream_), - is_little_endian); + return Serializable::read(value, reinterpret_cast<DataStream*>(composable_stream_), is_little_endian); } /** @@ -199,8 +181,7 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::readUTF(std::string &str, bool widen) { - return Serializable::readUTF( - str, reinterpret_cast<DataStream*>(composable_stream_), widen); + return Serializable::readUTF(str, reinterpret_cast<DataStream*>(composable_stream_), widen); } } /* namespace io */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index bd99bc7..57d6f03 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -39,8 +39,7 @@ namespace nifi { namespace minifi { namespace io { -Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, - const uint16_t listeners = -1) +Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1) : requested_hostname_(hostname), port_(port), addr_info_(0), @@ -86,8 +85,7 @@ void Socket::closeStream() { } int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { - if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { + if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { logger_->log_error("error while connecting to server socket"); return -1; } @@ -111,8 +109,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { sa_loc->sin_port = htons(port_); // use any address if you are connecting to the local machine for testing // otherwise we must use the requested hostname - if (IsNullOrEmpty(requested_hostname_) - || requested_hostname_ == "localhost") { + if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") { sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); } else { sa_loc->sin_addr.s_addr = addr; @@ -149,12 +146,10 @@ int16_t Socket::initialize() { hints.ai_flags |= AI_PASSIVE; hints.ai_protocol = 0; /* any protocol */ - int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, - &addr_info_); + int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_); if (errcode != 0) { - logger_->log_error("Saw error during getaddrinfo, error: %s", - strerror(errno)); + logger_->log_error("Saw error during getaddrinfo, error: %s", strerror(errno)); return -1; } @@ -210,8 +205,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) { retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL); if (retval < 0) { - logger_->log_error("Saw error during selection, error:%i %s", retval, - strerror(errno)); + logger_->log_error("Saw error during selection, error:%i %s", retval, strerror(errno)); return retval; } @@ -221,8 +215,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) { if (listeners_ > 0) { struct sockaddr_storage remoteaddr; // client address socklen_t addrlen = sizeof remoteaddr; - int newfd = accept(socket_file_descriptor_, - (struct sockaddr *) &remoteaddr, &addrlen); + int newfd = accept(socket_file_descriptor_, (struct sockaddr *) &remoteaddr, &addrlen); FD_SET(newfd, &total_list_); // add to master set if (newfd > socket_max_) { // keep track of the max socket_max_ = newfd; @@ -273,8 +266,7 @@ int16_t Socket::setSocketOptions(const int sock) { #else if (listeners_ > 0) { // lose the pesky "address already in use" error message - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) { + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() SO_REUSEADDR failed"); close(sock); return -1; @@ -304,16 +296,14 @@ int Socket::writeData(uint8_t *value, int size) { // check for errors if (ret <= 0) { close(socket_file_descriptor_); - logger_->log_error("Could not send to %d, error: %s", - socket_file_descriptor_, strerror(errno)); + logger_->log_error("Could not send to %d, error: %s", socket_file_descriptor_, strerror(errno)); return ret; } bytes += ret; } if (ret) - logger_->log_trace("Send data size %d over socket %d", size, - socket_file_descriptor_); + logger_->log_trace("Send data size %d over socket %d", size, socket_file_descriptor_); return bytes; } @@ -341,15 +331,11 @@ int Socket::read(uint64_t &value, bool is_little_endian) { auto buf = readBuffer(value); if (is_little_endian) { - value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) - | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) - | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16) - | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24) + | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); } else { - value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) - | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) - | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40) - | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32) + | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); } return sizeof(value); } @@ -397,8 +383,7 @@ int Socket::readData(uint8_t *buf, int buflen) { if (bytes_read == 0) { logger_->log_info("Other side hung up on %d", fd); } else { - logger_->log_error("Could not recv on %d, error: %s", fd, - strerror(errno)); + logger_->log_error("Could not recv on %d, error: %s", fd, strerror(errno)); } return -1; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/DataStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp index 9e0dfce..92c7cda 100644 --- a/libminifi/src/io/DataStream.cpp +++ b/libminifi/src/io/DataStream.cpp @@ -44,15 +44,11 @@ int DataStream::read(uint64_t &value, bool is_little_endian) { uint8_t *buf = &buffer[readBuffer]; if (is_little_endian) { - value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) - | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) - | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16) - | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24) + | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); } else { - value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) - | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) - | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40) - | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32) + | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); } readBuffer += 8; return 8; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/Serializable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp index c3f74c7..5e57c80 100644 --- a/libminifi/src/io/Serializable.cpp +++ b/libminifi/src/io/Serializable.cpp @@ -35,26 +35,20 @@ namespace io { template<typename T> int Serializable::writeData(const T &t, DataStream *stream) { uint8_t bytes[sizeof t]; - std::copy(static_cast<const char*>(static_cast<const void*>(&t)), - static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, - bytes); + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes); return stream->writeData(bytes, sizeof t); } template<typename T> int Serializable::writeData(const T &t, uint8_t *to_vec) { - std::copy(static_cast<const char*>(static_cast<const void*>(&t)), - static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, - to_vec); + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, to_vec); return sizeof t; } template<typename T> int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) { uint8_t bytes[sizeof t]; - std::copy(static_cast<const char*>(static_cast<const void*>(&t)), - static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, - bytes); + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, bytes); to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]); return sizeof t; } @@ -97,36 +91,29 @@ int Serializable::read(uint8_t *value, int len, DataStream *stream) { return stream->readData(value, len); } -int Serializable::read(uint16_t &value, DataStream *stream, - bool is_little_endian) { +int Serializable::read(uint16_t &value, DataStream *stream, bool is_little_endian) { return stream->read(value, is_little_endian); } -int Serializable::read(uint32_t &value, DataStream *stream, - bool is_little_endian) { +int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) { return stream->read(value, is_little_endian); } -int Serializable::read(uint64_t &value, DataStream *stream, - bool is_little_endian) { +int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) { return stream->read(value, is_little_endian); } -int Serializable::write(uint32_t base_value, DataStream *stream, - bool is_little_endian) { +int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) { const uint32_t value = is_little_endian ? htonl(base_value) : base_value; return writeData(value, stream); } -int Serializable::write(uint64_t base_value, DataStream *stream, - bool is_little_endian) { - const uint64_t value = - is_little_endian == 1 ? htonll_r(base_value) : base_value; +int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) { + const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value; return writeData(value, stream); } -int Serializable::write(uint16_t base_value, DataStream *stream, - bool is_little_endian) { +int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) { const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value; return writeData(value, stream); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/io/tls/TLSSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index b269cef..40cf1e9 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -57,9 +57,7 @@ int16_t TLSContext::initialize() { std::string clientAuthStr; bool needClientCert = true; - if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr) - && org::apache::nifi::minifi::utils::StringUtils::StringToBool( - clientAuthStr, needClientCert))) { + if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) { needClientCert = true; } @@ -67,8 +65,7 @@ int16_t TLSContext::initialize() { method = TLSv1_2_client_method(); ctx = SSL_CTX_new(method); if (ctx == NULL) { - logger_->log_error("Could not create SSL context, error: %s.", - std::strerror(errno)); + logger_->log_error("Could not create SSL context, error: %s.", std::strerror(errno)); error_value = TLS_ERROR_CONTEXT; return error_value; } @@ -78,56 +75,40 @@ int16_t TLSContext::initialize() { std::string passphrase; std::string caCertificate; - if (!(configure_->get(Configure::nifi_security_client_certificate, - certificate) - && configure_->get(Configure::nifi_security_client_private_key, - privatekey))) { - logger_->log_error( - "Certificate and Private Key PEM file not configured, error: %s.", - std::strerror(errno)); + if (!(configure_->get(Configure::nifi_security_client_certificate, certificate) && configure_->get(Configure::nifi_security_client_private_key, privatekey))) { + logger_->log_error("Certificate and Private Key PEM file not configured, error: %s.", std::strerror(errno)); error_value = TLS_ERROR_PEM_MISSING; return error_value; } // load certificates and private key in PEM format - if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) - <= 0) { - logger_->log_error("Could not create load certificate, error : %s", - std::strerror(errno)); + if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) { + logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno)); error_value = TLS_ERROR_CERT_MISSING; return error_value; } - if (configure_->get(Configure::nifi_security_client_pass_phrase, - passphrase)) { + if (configure_->get(Configure::nifi_security_client_pass_phrase, passphrase)) { // if the private key has passphase SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); - SSL_CTX_set_default_passwd_cb_userdata( - ctx, static_cast<void*>(configure_.get())); + SSL_CTX_set_default_passwd_cb_userdata(ctx, static_cast<void*>(configure_.get())); } - int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), - SSL_FILETYPE_PEM); + int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), SSL_FILETYPE_PEM); if (retp != 1) { - logger_->log_error( - "Could not create load private key,%i on %s error : %s", retp, - privatekey.c_str(), std::strerror(errno)); + logger_->log_error("Could not create load private key,%i on %s error : %s", retp, privatekey.c_str(), std::strerror(errno)); error_value = TLS_ERROR_KEY_ERROR; return error_value; } // verify private key if (!SSL_CTX_check_private_key(ctx)) { - logger_->log_error( - "Private key does not match the public certificate, error : %s", - std::strerror(errno)); + logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno)); error_value = TLS_ERROR_KEY_ERROR; return error_value; } // load CA certificates - if (configure_->get(Configure::nifi_security_client_ca_certificate, - caCertificate)) { + if (configure_->get(Configure::nifi_security_client_ca_certificate, caCertificate)) { retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); if (retp == 0) { - logger_->log_error("Can not load CA certificate, Exiting, error : %s", - std::strerror(errno)); + logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno)); error_value = TLS_ERROR_CERT_ERROR; return error_value; } @@ -149,24 +130,24 @@ TLSSocket::~TLSSocket() { * @param port connecting port * @param listeners number of listeners in the queue */ -TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, - const std::string &hostname, const uint16_t port, - const uint16_t listeners) +TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners) : Socket(context, hostname, port, listeners), - ssl(0), logger_(logging::LoggerFactory<TLSSocket>::getLogger()) { + ssl(0), + logger_(logging::LoggerFactory<TLSSocket>::getLogger()) { context_ = context; } -TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, - const std::string &hostname, const uint16_t port) +TLSSocket::TLSSocket(const std::shared_ptr<TLSContext> &context, const std::string &hostname, const uint16_t port) : Socket(context, hostname, port, 0), - ssl(0), logger_(logging::LoggerFactory<TLSSocket>::getLogger()) { + ssl(0), + logger_(logging::LoggerFactory<TLSSocket>::getLogger()) { context_ = context; } TLSSocket::TLSSocket(const TLSSocket &&d) : Socket(std::move(d)), - ssl(0), logger_(std::move(d.logger_)) { + ssl(0), + logger_(std::move(d.logger_)) { context_ = d.context_; } @@ -178,15 +159,13 @@ int16_t TLSSocket::initialize() { ssl = SSL_new(context_->getContext()); SSL_set_fd(ssl, socket_file_descriptor_); if (SSL_connect(ssl) == -1) { - logger_->log_error("SSL socket connect failed to %s %d", - requested_hostname_.c_str(), port_); + logger_->log_error("SSL socket connect failed to %s %d", requested_hostname_.c_str(), port_); SSL_free(ssl); ssl = NULL; close(socket_file_descriptor_); return -1; } else { - logger_->log_info("SSL socket connect success to %s %d", - requested_hostname_.c_str(), port_); + logger_->log_info("SSL socket connect success to %s %d", requested_hostname_.c_str(), port_); return 0; } } @@ -213,8 +192,7 @@ int TLSSocket::writeData(uint8_t *value, int size) { sent = SSL_write(ssl, value + bytes, size - bytes); // check for errors if (sent < 0) { - logger_->log_error("Site2Site Peer socket %d send failed %s", - socket_file_descriptor_, strerror(errno)); + logger_->log_error("Site2Site Peer socket %d send failed %s", socket_file_descriptor_, strerror(errno)); return sent; } bytes += sent; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/AppendHostInfo.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp index b3c76db..bbc86a5 100644 --- a/libminifi/src/processors/AppendHostInfo.cpp +++ b/libminifi/src/processors/AppendHostInfo.cpp @@ -46,19 +46,10 @@ namespace processors { #define HOST_NAME_MAX 255 #endif -core::Property AppendHostInfo::InterfaceName( - "Network Interface Name", - "Network interface from which to read an IP v4 address", "eth0"); -core::Property AppendHostInfo::HostAttribute( - "Hostname Attribute", - "Flowfile attribute to used to record the agent's hostname", - "source.hostname"); -core::Property AppendHostInfo::IPAttribute( - "IP Attribute", - "Flowfile attribute to used to record the agent's IP address", - "source.ipv4"); -core::Relationship AppendHostInfo::Success( - "success", "success operational on the flow record"); +core::Property AppendHostInfo::InterfaceName("Network Interface Name", "Network interface from which to read an IP v4 address", "eth0"); +core::Property AppendHostInfo::HostAttribute("Hostname Attribute", "Flowfile attribute to used to record the agent's hostname", "source.hostname"); +core::Property AppendHostInfo::IPAttribute("IP Attribute", "Flowfile attribute to used to record the agent's IP address", "source.ipv4"); +core::Relationship AppendHostInfo::Success("success", "success operational on the flow record"); void AppendHostInfo::initialize() { // Set the supported properties @@ -74,8 +65,7 @@ void AppendHostInfo::initialize() { setSupportedRelationships(relationships); } -void AppendHostInfo::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void AppendHostInfo::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::shared_ptr<core::FlowFile> flow = session->get(); if (!flow) return; @@ -84,8 +74,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context, std::string hostAttribute = ""; context->getProperty(HostAttribute.getName(), hostAttribute); - flow->addAttribute(hostAttribute.c_str(), - org::apache::nifi::minifi::io::Socket::getMyHostName()); + flow->addAttribute(hostAttribute.c_str(), org::apache::nifi::minifi::io::Socket::getMyHostName()); // Get IP address for the specified interface std::string iface; @@ -103,9 +92,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context, std::string ipAttribute; context->getProperty(IPAttribute.getName(), ipAttribute); - flow->addAttribute( - ipAttribute.c_str(), - inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr)); + flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr)); } // Transfer to the relationship http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp index 701c645..323d69a 100644 --- a/libminifi/src/processors/ExecuteProcess.cpp +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -33,31 +33,18 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ExecuteProcess::Command( - "Command", - "Specifies the command to be executed; if just the name of an executable" - " is provided, it must be in the user's environment PATH.", - ""); -core::Property ExecuteProcess::CommandArguments( - "Command Arguments", - "The arguments to supply to the executable delimited by white space. White " - "space can be escaped by enclosing it in double-quotes.", - ""); -core::Property ExecuteProcess::WorkingDir( - "Working Directory", - "The directory to use as the current working directory when executing the command", - ""); -core::Property ExecuteProcess::BatchDuration( - "Batch Duration", - "If the process is expected to be long-running and produce textual output, a " - "batch duration can be specified.", - "0"); -core::Property ExecuteProcess::RedirectErrorStream( - "Redirect Error Stream", - "If true will redirect any error stream output of the process to the output stream.", - "false"); -core::Relationship ExecuteProcess::Success( - "success", "All created FlowFiles are routed to this relationship."); +core::Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable" + " is provided, it must be in the user's environment PATH.", + ""); +core::Property ExecuteProcess::CommandArguments("Command Arguments", "The arguments to supply to the executable delimited by white space. White " + "space can be escaped by enclosing it in double-quotes.", + ""); +core::Property ExecuteProcess::WorkingDir("Working Directory", "The directory to use as the current working directory when executing the command", ""); +core::Property ExecuteProcess::BatchDuration("Batch Duration", "If the process is expected to be long-running and produce textual output, a " + "batch duration can be specified.", + "0"); +core::Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream", "If true will redirect any error stream output of the process to the output stream.", "false"); +core::Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship."); void ExecuteProcess::initialize() { // Set the supported properties @@ -74,8 +61,7 @@ void ExecuteProcess::initialize() { setSupportedRelationships(relationships); } -void ExecuteProcess::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::string value; if (context->getProperty(Command.getName(), value)) { this->_command = value; @@ -88,15 +74,12 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, } if (context->getProperty(BatchDuration.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, _batchDuration, unit) - && core::Property::ConvertTimeUnitToMS(_batchDuration, unit, - _batchDuration)) { + if (core::Property::StringToTime(value, _batchDuration, unit) && core::Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) { logger_->log_info("Setting _batchDuration"); } } if (context->getProperty(RedirectErrorStream.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool( - value, _redirectErrorStream); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, _redirectErrorStream); } this->_fullCommand = _command + " " + _commandArgument; if (_fullCommand.length() == 0) { @@ -106,8 +89,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, if (_workingDir.length() > 0 && _workingDir != ".") { // change to working directory if (chdir(_workingDir.c_str()) != 0) { - logger_->log_error("Execute Command can not chdir %s", - _workingDir.c_str()); + logger_->log_error("Execute Command can not chdir %s", _workingDir.c_str()); yield(); return; } @@ -156,21 +138,18 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, close(_pipefd[1]); if (_batchDuration > 0) { while (1) { - std::this_thread::sleep_for( - std::chrono::milliseconds(_batchDuration)); + std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration)); char buffer[4096]; int numRead = read(_pipefd[0], buffer, sizeof(buffer)); if (numRead <= 0) break; logger_->log_info("Execute Command Respond %d", numRead); ExecuteProcess::WriteCallback callback(buffer, numRead); - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) continue; flowFile->addAttribute("command", _command.c_str()); - flowFile->addAttribute("command.arguments", - _commandArgument.c_str()); + flowFile->addAttribute("command.arguments", _commandArgument.c_str()); session->write(flowFile, &callback); session->transfer(flowFile, Success); session->commit(); @@ -181,21 +160,18 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, int totalRead = 0; std::shared_ptr<FlowFileRecord> flowFile = nullptr; while (1) { - int numRead = read(_pipefd[0], bufPtr, - (sizeof(buffer) - totalRead)); + int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead)); if (numRead <= 0) { if (totalRead > 0) { logger_->log_info("Execute Command Respond %d", totalRead); // child exits and close the pipe ExecuteProcess::WriteCallback callback(buffer, totalRead); if (!flowFile) { - flowFile = std::static_pointer_cast<FlowFileRecord>( - session->create()); + flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) break; flowFile->addAttribute("command", _command.c_str()); - flowFile->addAttribute("command.arguments", - _commandArgument.c_str()); + flowFile->addAttribute("command.arguments", _commandArgument.c_str()); session->write(flowFile, &callback); } else { session->append(flowFile, &callback); @@ -206,17 +182,14 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, } else { if (numRead == (sizeof(buffer) - totalRead)) { // we reach the max buffer size - logger_->log_info("Execute Command Max Respond %d", - sizeof(buffer)); + logger_->log_info("Execute Command Max Respond %d", sizeof(buffer)); ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); if (!flowFile) { - flowFile = std::static_pointer_cast<FlowFileRecord>( - session->create()); + flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) continue; flowFile->addAttribute("command", _command.c_str()); - flowFile->addAttribute("command.arguments", - _commandArgument.c_str()); + flowFile->addAttribute("command.arguments", _commandArgument.c_str()); session->write(flowFile, &callback); } else { session->append(flowFile, &callback); @@ -234,11 +207,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, died = wait(&status); if (WIFEXITED(status)) { - logger_->log_info("Execute Command Complete %s status %d pid %d", - _fullCommand.c_str(), WEXITSTATUS(status), _pid); + logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid); } else { - logger_->log_info("Execute Command Complete %s status %d pid %d", - _fullCommand.c_str(), WTERMSIG(status), _pid); + logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid); } close(_pipefd[0]); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp index 34c0ae2..3741a8f 100644 --- a/libminifi/src/processors/GenerateFlowFile.cpp +++ b/libminifi/src/processors/GenerateFlowFile.cpp @@ -40,20 +40,11 @@ namespace minifi { namespace processors { const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary"; const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text"; -core::Property GenerateFlowFile::FileSize( - "File Size", "The size of the file that will be used", "1 kB"); -core::Property GenerateFlowFile::BatchSize( - "Batch Size", - "The number of FlowFiles to be transferred in each invocation", "1"); -core::Property GenerateFlowFile::DataFormat( - "Data Format", "Specifies whether the data should be Text or Binary", - GenerateFlowFile::DATA_FORMAT_BINARY); -core::Property GenerateFlowFile::UniqueFlowFiles( - "Unique FlowFiles", - "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", - "true"); -core::Relationship GenerateFlowFile::Success( - "success", "success operational on the flow record"); +core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB"); +core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1"); +core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY); +core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true"); +core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record"); void GenerateFlowFile::initialize() { // Set the supported properties @@ -69,8 +60,7 @@ void GenerateFlowFile::initialize() { setSupportedRelationships(relationships); } -void GenerateFlowFile::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { int64_t batchSize = 1; bool uniqueFlowFile = true; int64_t fileSize = 1024; @@ -83,8 +73,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::Property::StringToInt(value, batchSize); } if (context->getProperty(UniqueFlowFiles.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, - uniqueFlowFile); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile); } if (!uniqueFlowFile) { @@ -102,8 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, } for (int i = 0; i < batchSize; i++) { // For each batch - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) return; if (fileSize > 0) @@ -126,8 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, GenerateFlowFile::WriteCallback callback(_data, _dataSize); for (int i = 0; i < batchSize; i++) { // For each batch - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) return; if (fileSize > 0) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index e39afec..f1dbb21 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -44,50 +44,22 @@ namespace nifi { namespace minifi { namespace processors { - - - -core::Property GetFile::BatchSize( - "Batch Size", "The maximum number of files to pull in each iteration", - "10"); -core::Property GetFile::Directory( - "Input Directory", "The input directory from which to pull files", "."); -core::Property GetFile::IgnoreHiddenFile( - "Ignore Hidden Files", - "Indicates whether or not hidden files should be ignored", "true"); -core::Property GetFile::KeepSourceFile( - "Keep Source File", - "If true, the file is not deleted after it has been copied to the Content Repository", - "false"); -core::Property GetFile::MaxAge( - "Maximum File Age", - "The minimum age that a file must be in order to be pulled;" - " any file younger than this amount of time (according to last modification date) will be ignored", - "0 sec"); -core::Property GetFile::MinAge( - "Minimum File Age", - "The maximum age that a file must be in order to be pulled; any file" - "older than this amount of time (according to last modification date) will be ignored", - "0 sec"); -core::Property GetFile::MaxSize( - "Maximum File Size", - "The maximum size that a file can be in order to be pulled", "0 B"); -core::Property GetFile::MinSize( - "Minimum File Size", - "The minimum size that a file must be in order to be pulled", "0 B"); -core::Property GetFile::PollInterval( - "Polling Interval", - "Indicates how long to wait before performing a directory listing", - "0 sec"); -core::Property GetFile::Recurse( - "Recurse Subdirectories", - "Indicates whether or not to pull files from subdirectories", "true"); -core::Property GetFile::FileFilter( - "File Filter", - "Only files whose names match the given regular expression will be picked up", - "[^\\.].*"); -core::Relationship GetFile::Success("success", - "All files are routed to success"); +core::Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10"); +core::Property GetFile::Directory("Input Directory", "The input directory from which to pull files", "."); +core::Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true"); +core::Property GetFile::KeepSourceFile("Keep Source File", "If true, the file is not deleted after it has been copied to the Content Repository", "false"); +core::Property GetFile::MaxAge("Maximum File Age", "The minimum age that a file must be in order to be pulled;" + " any file younger than this amount of time (according to last modification date) will be ignored", + "0 sec"); +core::Property GetFile::MinAge("Minimum File Age", "The maximum age that a file must be in order to be pulled; any file" + "older than this amount of time (according to last modification date) will be ignored", + "0 sec"); +core::Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B"); +core::Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B"); +core::Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec"); +core::Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true"); +core::Property GetFile::FileFilter("File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*"); +core::Relationship GetFile::Success("success", "All files are routed to success"); void GetFile::initialize() { // Set the supported properties @@ -110,8 +82,7 @@ void GetFile::initialize() { setSupportedRelationships(relationships); } -void GetFile::onSchedule(core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory) { +void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { std::string value; if (context->getProperty(Directory.getName(), value)) { @@ -121,27 +92,21 @@ void GetFile::onSchedule(core::ProcessContext *context, core::Property::StringToInt(value, request_.batchSize); } if (context->getProperty(IgnoreHiddenFile.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool( - value, request_.ignoreHiddenFile); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.ignoreHiddenFile); } if (context->getProperty(KeepSourceFile.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool( - value, request_.keepSourceFile); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.keepSourceFile); } if (context->getProperty(MaxAge.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, request_.maxAge, unit) - && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit, - request_.maxAge)) { + if (core::Property::StringToTime(value, request_.maxAge, unit) && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit, request_.maxAge)) { logger_->log_debug("successfully applied _maxAge"); } } if (context->getProperty(MinAge.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, request_.minAge, unit) - && core::Property::ConvertTimeUnitToMS(request_.minAge, unit, - request_.minAge)) { + if (core::Property::StringToTime(value, request_.minAge, unit) && core::Property::ConvertTimeUnitToMS(request_.minAge, unit, request_.minAge)) { logger_->log_debug("successfully applied _minAge"); } } @@ -153,15 +118,12 @@ void GetFile::onSchedule(core::ProcessContext *context, } if (context->getProperty(PollInterval.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, request_.pollInterval, unit) - && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit, - request_.pollInterval)) { + if (core::Property::StringToTime(value, request_.pollInterval, unit) && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit, request_.pollInterval)) { logger_->log_debug("successfully applied _pollInterval"); } } if (context->getProperty(Recurse.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool( - value, request_.recursive); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.recursive); } if (context->getProperty(FileFilter.getName(), value)) { @@ -169,13 +131,11 @@ void GetFile::onSchedule(core::ProcessContext *context, } } -void GetFile::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { // Perform directory list logger_->log_info("Is listing empty %i", isListingEmpty()); if (isListingEmpty()) { - if (request_.pollInterval == 0 - || (getTimeMillis() - last_listing_time_) > request_.pollInterval) { + if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) { performListing(request_.directory, request_); last_listing_time_.store(getTimeMillis()); } @@ -190,8 +150,7 @@ void GetFile::onTrigger(core::ProcessContext *context, std::string fileName = list.front(); list.pop(); logger_->log_info("GetFile process %s", fileName.c_str()); - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (flowFile == nullptr) return; std::size_t found = fileName.find_last_of("/\\"); @@ -224,12 +183,10 @@ void GetFile::putListing(std::string fileName) { _dirList.push(fileName); } -void GetFile::pollListing(std::queue<std::string> &list, - const GetFileRequest &request) { +void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) { std::lock_guard<std::mutex> lock(mutex_); - while (!_dirList.empty() - && (request.maxSize == 0 || list.size() < request.maxSize)) { + while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) { std::string fileName = _dirList.front(); _dirList.pop(); list.push(fileName); @@ -238,8 +195,7 @@ void GetFile::pollListing(std::queue<std::string> &list, return; } -bool GetFile::acceptFile(std::string fullName, std::string name, - const GetFileRequest &request) { +bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRequest &request) { struct stat statbuf; if (stat(fullName.c_str(), &statbuf) == 0) { @@ -296,8 +252,7 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) { std::string d_name = entry->d_name; if ((entry->d_type & DT_DIR)) { // if this is a directory - if (request.recursive && strcmp(d_name.c_str(), "..") != 0 - && strcmp(d_name.c_str(), ".") != 0) { + if (request.recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) { std::string path = dir + "/" + d_name; performListing(path, request); }
