http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index eca84be..e472a9a 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -30,765 +30,271 @@ #include <unistd.h> #include <future> #include "FlowController.h" -#include "ProcessContext.h" +#include "core/ProcessContext.h" +#include "core/ProcessGroup.h" #include "utils/StringUtils.h" +#include "core/core.h" +#include "core/repository/FlowFileRepository.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +#define DEFAULT_CONFIG_NAME "conf/flow.yml" + +FlowController::FlowController( + std::shared_ptr<core::Repository> provenance_repo, + std::shared_ptr<core::Repository> flow_file_repo, + std::unique_ptr<core::FlowConfiguration> flow_configuration, + const std::string name, bool headless_mode) + : CoreComponent(core::getClassName<FlowController>()), + root_(nullptr), + max_timer_driven_threads_(0), + max_event_driven_threads_(0), + running_(false), + initialized_(false), + provenance_repo_(provenance_repo), + flow_file_repo_(flow_file_repo), + protocol_(0), + _timerScheduler(provenance_repo_), + _eventScheduler(provenance_repo_), + flow_configuration_(std::move(flow_configuration)) { + if (provenance_repo == nullptr) + throw std::runtime_error("Provenance Repo should not be null"); + if (flow_file_repo == nullptr) + throw std::runtime_error("Flow Repo should not be null"); + + uuid_generate(uuid_); + setUUID(uuid_); + + // Setup the default values + if (flow_configuration_ != nullptr) { + configuration_filename_ = flow_configuration_->getConfigurationPath(); + } + max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD; + max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD; + running_ = false; + initialized_ = false; + root_ = NULL; + + protocol_ = new FlowControlProtocol(this); + + // NiFi config properties + configure_ = Configure::getConfigure(); + + if (!headless_mode) { + std::string rawConfigFileString; + configure_->get(Configure::nifi_flow_configuration_file, + rawConfigFileString); + + if (!rawConfigFileString.empty()) { + configuration_filename_ = rawConfigFileString; + } + + std::string adjustedFilename; + if (!configuration_filename_.empty()) { + // perform a naive determination if this is a relative path + if (configuration_filename_.c_str()[0] != '/') { + adjustedFilename = adjustedFilename + configure_->getHome() + "/" + + configuration_filename_; + } else { + adjustedFilename = configuration_filename_; + } + } + + initializePaths(adjustedFilename); + } -FlowController *FlowControllerFactory::_flowController(NULL); - -FlowControllerImpl::FlowControllerImpl(std::string name) { - uuid_generate(_uuid); - - _name = name; - // Setup the default values - _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME; - _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; - _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; - _running = false; - _initialized = false; - _root = NULL; - logger_ = Logger::getLogger(); - _protocol = new FlowControlProtocol(this); - - // NiFi config properties - configure_ = Configure::getConfigure(); - - std::string rawConfigFileString; - configure_->get(Configure::nifi_flow_configuration_file, - rawConfigFileString); - - if (!rawConfigFileString.empty()) { - _configurationFileName = rawConfigFileString; - } - - char *path = NULL; - char full_path[PATH_MAX]; - - std::string adjustedFilename; - if (!_configurationFileName.empty()) { - // perform a naive determination if this is a relative path - if (_configurationFileName.c_str()[0] != '/') { - adjustedFilename = adjustedFilename + configure_->getHome() + "/" - + _configurationFileName; - } else { - adjustedFilename = _configurationFileName; - } - } - - path = realpath(adjustedFilename.c_str(), full_path); - - std::string pathString(path); - _configurationFileName = pathString; - logger_->log_info("FlowController NiFi Configuration file %s", pathString.c_str()); - - // Create the content repo directory if needed - struct stat contentDirStat; - - if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) - { - path = realpath(ResourceClaim::default_directory_path.c_str(), full_path); - logger_->log_info("FlowController content directory %s", full_path); - } - else - { - if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) - { - logger_->log_error("FlowController content directory creation failed"); - exit(1); - } - } - - - std::string clientAuthStr; - - if (!path) { - logger_->log_error( - "Could not locate path from provided configuration file name (%s). Exiting.", - full_path); - exit(1); - } - - - // Create repos for flow record and provenance - _flowfileRepo = new FlowFileRepository(); - _flowfileRepo->initialize(); - _provenanceRepo = new ProvenanceRepository(); - _provenanceRepo->initialize(); } -FlowControllerImpl::~FlowControllerImpl() { +void FlowController::initializePaths(const std::string &adjustedFilename) { + char *path = NULL; + char full_path[PATH_MAX]; + path = realpath(adjustedFilename.c_str(), full_path); + + if (path == NULL) { + throw std::runtime_error( + "Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists"); + } + std::string pathString(path); + configuration_filename_ = pathString; + logger_->log_info("FlowController NiFi Configuration file %s", + pathString.c_str()); + + // Create the content repo directory if needed + struct stat contentDirStat; + + if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat) + != -1&& S_ISDIR(contentDirStat.st_mode)) { + path = realpath(ResourceClaim::default_directory_path.c_str(), full_path); + logger_->log_info("FlowController content directory %s", full_path); + } else { + if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) { + logger_->log_error("FlowController content directory creation failed"); + exit(1); + } + } - stop(true); - unload(); - if (NULL != _protocol) - delete _protocol; - if (NULL != _provenanceRepo) - delete _provenanceRepo; - if (NULL != _flowfileRepo) - delete _flowfileRepo; + std::string clientAuthStr; -} + if (!path) { + logger_->log_error( + "Could not locate path from provided configuration file name (%s). Exiting.", + full_path); + exit(1); + } -void FlowControllerImpl::stop(bool force) { +} - if (_running) { - // immediately indicate that we are not running - _running = false; +FlowController::~FlowController() { + stop(true); + unload(); + if (NULL != protocol_) + delete protocol_; - logger_->log_info("Stop Flow Controller"); - this->_timerScheduler.stop(); - this->_eventScheduler.stop(); - this->_flowfileRepo->stop(); - this->_provenanceRepo->stop(); - // Wait for sometime for thread stop - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (this->_root) - this->_root->stopProcessing(&this->_timerScheduler, - &this->_eventScheduler); +} - } +void FlowController::stop(bool force) { + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + if (running_) { + // immediately indicate that we are not running + running_ = false; + + logger_->log_info("Stop Flow Controller"); + this->_timerScheduler.stop(); + this->_eventScheduler.stop(); + this->flow_file_repo_->stop(); + this->provenance_repo_->stop(); + // Wait for sometime for thread stop + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if (this->root_) + this->root_->stopProcessing(&this->_timerScheduler, + &this->_eventScheduler); + + } } /** * This function will attempt to unload yaml and stop running Processors. * * If the latter attempt fails or does not complete within the prescribed - * period, _running will be set to false and we will return. + * period, running_ will be set to false and we will return. * * @param timeToWaitMs Maximum time to wait before manually * marking running as false. */ -void FlowControllerImpl::waitUnload(const uint64_t timeToWaitMs) { - if (_running) { - // use the current time and increment with the provided argument. - std::chrono::system_clock::time_point wait_time = - std::chrono::system_clock::now() - + std::chrono::milliseconds(timeToWaitMs); - - // create an asynchronous future. - std::future<void> unload_task = std::async(std::launch::async, - [this]() {unload();}); - - if (std::future_status::ready == unload_task.wait_until(wait_time)) { - _running = false; - } - - } -} - - -void FlowControllerImpl::unload() { - if (_running) { - stop(true); - } - if (_initialized) { - logger_->log_info("Unload Flow Controller"); - if (_root) - delete _root; - _root = NULL; - _initialized = false; - _name = ""; - } - - return; -} - -Processor *FlowControllerImpl::createProcessor(std::string name, uuid_t uuid) { - Processor *processor = NULL; - if (name == GenerateFlowFile::ProcessorName) { - processor = new GenerateFlowFile(name, uuid); - } else if (name == LogAttribute::ProcessorName) { - processor = new LogAttribute(name, uuid); - } else if (name == RealTimeDataCollector::ProcessorName) { - processor = new RealTimeDataCollector(name, uuid); - } else if (name == GetFile::ProcessorName) { - processor = new GetFile(name, uuid); - } else if (name == PutFile::ProcessorName) { - processor = new PutFile(name, uuid); - } else if (name == TailFile::ProcessorName) { - processor = new TailFile(name, uuid); - } else if (name == ListenSyslog::ProcessorName) { - processor = new ListenSyslog(name, uuid); - } else if (name == ListenHTTP::ProcessorName) { - processor = new ListenHTTP(name, uuid); - } else if (name == ExecuteProcess::ProcessorName) { - processor = new ExecuteProcess(name, uuid); - } else if (name == AppendHostInfo::ProcessorName) { - processor = new AppendHostInfo(name, uuid); - } else { - logger_->log_error("No Processor defined for %s", name.c_str()); - return NULL; - } - - //! initialize the processor - processor->initialize(); - - return processor; -} - -ProcessGroup *FlowControllerImpl::createRootProcessGroup(std::string name, - uuid_t uuid) { - return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid); -} - -ProcessGroup *FlowControllerImpl::createRemoteProcessGroup(std::string name, - uuid_t uuid) { - return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid); -} - -Connection *FlowControllerImpl::createConnection(std::string name, - uuid_t uuid) { - return new Connection(name, uuid); -} - -#ifdef YAML_SUPPORT -void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { - uuid_t uuid; - ProcessGroup *group = NULL; - - std::string flowName = rootFlowNode["name"].as<std::string>(); - std::string id = rootFlowNode["id"].as<std::string>(); - - uuid_parse(id.c_str(), uuid); - - logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str()); - logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); - group = this->createRootProcessGroup(flowName, uuid); - this->_root = group; - this->_name = flowName; -} - -void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode, - ProcessGroup *parentGroup) { - int64_t schedulingPeriod = -1; - int64_t penalizationPeriod = -1; - int64_t yieldPeriod = -1; - int64_t runDurationNanos = -1; - uuid_t uuid; - Processor *processor = NULL; - - if (!parentGroup) { - logger_->log_error("parseProcessNodeYaml: no parent group exists"); - return; - } - - if (processorsNode) { - - if (processorsNode.IsSequence()) { - // Evaluate sequence of processors - int numProcessors = processorsNode.size(); - - for (YAML::const_iterator iter = processorsNode.begin(); - iter != processorsNode.end(); ++iter) { - ProcessorConfig procCfg; - YAML::Node procNode = iter->as<YAML::Node>(); - - procCfg.name = procNode["name"].as<std::string>(); - procCfg.id = procNode["id"].as<std::string>(); - logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", - procCfg.name.c_str(), procCfg.id.c_str()); - procCfg.javaClass = procNode["class"].as<std::string>(); - logger_->log_debug("parseProcessorNode: class => [%s]", - procCfg.javaClass.c_str()); - - uuid_parse(procCfg.id.c_str(), uuid); - - // Determine the processor name only from the Java class - int lastOfIdx = procCfg.javaClass.find_last_of("."); - if (lastOfIdx != std::string::npos) { - lastOfIdx++; // if a value is found, increment to move beyond the . - int nameLength = procCfg.javaClass.length() - lastOfIdx; - std::string processorName = procCfg.javaClass.substr( - lastOfIdx, nameLength); - processor = this->createProcessor(processorName, uuid); - } - - if (!processor) { - logger_->log_error( - "Could not create a processor %s with name %s", - procCfg.name.c_str(), procCfg.id.c_str()); - throw std::invalid_argument( - "Could not create processor " + procCfg.name); - } - processor->setName(procCfg.name); - - procCfg.maxConcurrentTasks = - procNode["max concurrent tasks"].as<std::string>(); - logger_->log_debug( - "parseProcessorNode: max concurrent tasks => [%s]", - procCfg.maxConcurrentTasks.c_str()); - procCfg.schedulingStrategy = procNode["scheduling strategy"].as< - std::string>(); - logger_->log_debug( - "parseProcessorNode: scheduling strategy => [%s]", - procCfg.schedulingStrategy.c_str()); - procCfg.schedulingPeriod = procNode["scheduling period"].as< - std::string>(); - logger_->log_debug( - "parseProcessorNode: scheduling period => [%s]", - procCfg.schedulingPeriod.c_str()); - procCfg.penalizationPeriod = procNode["penalization period"].as< - std::string>(); - logger_->log_debug( - "parseProcessorNode: penalization period => [%s]", - procCfg.penalizationPeriod.c_str()); - procCfg.yieldPeriod = - procNode["yield period"].as<std::string>(); - logger_->log_debug("parseProcessorNode: yield period => [%s]", - procCfg.yieldPeriod.c_str()); - procCfg.yieldPeriod = procNode["run duration nanos"].as< - std::string>(); - logger_->log_debug( - "parseProcessorNode: run duration nanos => [%s]", - procCfg.runDurationNanos.c_str()); - - // handle auto-terminated relationships - YAML::Node autoTerminatedSequence = - procNode["auto-terminated relationships list"]; - std::vector<std::string> rawAutoTerminatedRelationshipValues; - if (autoTerminatedSequence.IsSequence() - && !autoTerminatedSequence.IsNull() - && autoTerminatedSequence.size() > 0) { - for (YAML::const_iterator relIter = - autoTerminatedSequence.begin(); - relIter != autoTerminatedSequence.end(); - ++relIter) { - std::string autoTerminatedRel = - relIter->as<std::string>(); - rawAutoTerminatedRelationshipValues.push_back( - autoTerminatedRel); - } - } - procCfg.autoTerminatedRelationships = - rawAutoTerminatedRelationshipValues; - - // handle processor properties - YAML::Node propertiesNode = procNode["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); - - // Take care of scheduling - TimeUnit unit; - if (Property::StringToTime(procCfg.schedulingPeriod, - schedulingPeriod, unit) - && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, - schedulingPeriod)) { - logger_->log_debug( - "convert: parseProcessorNode: schedulingPeriod => [%d] ns", - schedulingPeriod); - processor->setSchedulingPeriodNano(schedulingPeriod); - } - - if (Property::StringToTime(procCfg.penalizationPeriod, - penalizationPeriod, unit) - && Property::ConvertTimeUnitToMS(penalizationPeriod, - unit, penalizationPeriod)) { - logger_->log_debug( - "convert: parseProcessorNode: penalizationPeriod => [%d] ms", - penalizationPeriod); - processor->setPenalizationPeriodMsec(penalizationPeriod); - } - - if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, - unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, - yieldPeriod)) { - logger_->log_debug( - "convert: parseProcessorNode: yieldPeriod => [%d] ms", - yieldPeriod); - processor->setYieldPeriodMsec(yieldPeriod); - } - - // Default to running - processor->setScheduledState(RUNNING); - - if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { - processor->setSchedulingStrategy(TIMER_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", - procCfg.schedulingStrategy.c_str()); - } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { - processor->setSchedulingStrategy(EVENT_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", - procCfg.schedulingStrategy.c_str()); - } else { - processor->setSchedulingStrategy(CRON_DRIVEN); - logger_->log_debug("setting scheduling strategy as %s", - procCfg.schedulingStrategy.c_str()); - - } - - int64_t maxConcurrentTasks; - if (Property::StringToInt(procCfg.maxConcurrentTasks, - maxConcurrentTasks)) { - logger_->log_debug( - "parseProcessorNode: maxConcurrentTasks => [%d]", - maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - - if (Property::StringToInt(procCfg.runDurationNanos, - runDurationNanos)) { - logger_->log_debug( - "parseProcessorNode: runDurationNanos => [%d]", - runDurationNanos); - processor->setRunDurationNano(runDurationNanos); - } - - std::set<Relationship> autoTerminatedRelationships; - for (auto &&relString : procCfg.autoTerminatedRelationships) { - Relationship relationship(relString, ""); - logger_->log_debug( - "parseProcessorNode: autoTerminatedRelationship => [%s]", - relString.c_str()); - autoTerminatedRelationships.insert(relationship); - } - - processor->setAutoTerminatedRelationships( - autoTerminatedRelationships); - - parentGroup->addProcessor(processor); - } - } - } else { - throw new std::invalid_argument( - "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); - } -} - -void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, - ProcessGroup *parentGroup) { - uuid_t uuid; - - if (!parentGroup) { - logger_->log_error( - "parseRemoteProcessGroupYaml: no parent group exists"); - return; - } - - if (rpgNode) { - if (rpgNode->IsSequence()) { - for (YAML::const_iterator iter = rpgNode->begin(); - iter != rpgNode->end(); ++iter) { - YAML::Node rpgNode = iter->as<YAML::Node>(); - - auto name = rpgNode["name"].as<std::string>(); - auto id = rpgNode["id"].as<std::string>(); - - logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", - name.c_str(), id.c_str()); - - std::string url = rpgNode["url"].as<std::string>(); - logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", - url.c_str()); - - std::string timeout = rpgNode["timeout"].as<std::string>(); - logger_->log_debug( - "parseRemoteProcessGroupYaml: timeout => [%s]", - timeout.c_str()); - - std::string yieldPeriod = - rpgNode["yield period"].as<std::string>(); - logger_->log_debug( - "parseRemoteProcessGroupYaml: yield period => [%s]", - yieldPeriod.c_str()); - - YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>(); - YAML::Node outputPorts = - rpgNode["Output Ports"].as<YAML::Node>(); - ProcessGroup *group = NULL; - - uuid_parse(id.c_str(), uuid); - - int64_t timeoutValue = -1; - int64_t yieldPeriodValue = -1; - - group = this->createRemoteProcessGroup(name.c_str(), uuid); - group->setParent(parentGroup); - parentGroup->addProcessGroup(group); - - TimeUnit unit; - - if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) - && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, - yieldPeriodValue) && group) { - logger_->log_debug( - "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", - yieldPeriodValue); - group->setYieldPeriodMsec(yieldPeriodValue); - } - - if (Property::StringToTime(timeout, timeoutValue, unit) - && Property::ConvertTimeUnitToMS(timeoutValue, unit, - timeoutValue) && group) { - logger_->log_debug( - "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", - timeoutValue); - group->setTimeOut(timeoutValue); - } - - group->setTransmitting(true); - group->setURL(url); - - if (inputPorts && inputPorts.IsSequence()) { - for (YAML::const_iterator portIter = inputPorts.begin(); - portIter != inputPorts.end(); ++portIter) { - logger_->log_debug("Got a current port, iterating..."); - - YAML::Node currPort = portIter->as<YAML::Node>(); - - this->parsePortYaml(&currPort, group, SEND); - } // for node - } - if (outputPorts && outputPorts.IsSequence()) { - for (YAML::const_iterator portIter = outputPorts.begin(); - portIter != outputPorts.end(); ++portIter) { - logger_->log_debug("Got a current port, iterating..."); - - YAML::Node currPort = portIter->as<YAML::Node>(); - - this->parsePortYaml(&currPort, group, RECEIVE); - } // for node - } - - } - } - } -} - -void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode, - ProcessGroup *parent) { - uuid_t uuid; - Connection *connection = NULL; - - if (!parent) { - logger_->log_error("parseProcessNode: no parent group was provided"); - return; - } - - if (connectionsNode) { - - if (connectionsNode->IsSequence()) { - for (YAML::const_iterator iter = connectionsNode->begin(); - iter != connectionsNode->end(); ++iter) { - - YAML::Node connectionNode = iter->as<YAML::Node>(); - - std::string name = connectionNode["name"].as<std::string>(); - std::string id = connectionNode["id"].as<std::string>(); - std::string destId = connectionNode["destination id"].as< - std::string>(); - - uuid_parse(id.c_str(), uuid); - - logger_->log_debug( - "Created connection with UUID %s and name %s", id.c_str(), - name.c_str()); - connection = this->createConnection(name, uuid); - auto rawRelationship = - connectionNode["source relationship name"].as< - std::string>(); - Relationship relationship(rawRelationship, ""); - logger_->log_debug("parseConnection: relationship => [%s]", - rawRelationship.c_str()); - if (connection) - connection->setRelationship(relationship); - std::string connectionSrcProcId = - connectionNode["source id"].as<std::string>(); - uuid_t srcUUID; - uuid_parse(connectionSrcProcId.c_str(), srcUUID); - - Processor *srcProcessor = this->_root->findProcessor( - srcUUID); - - if (!srcProcessor) { - logger_->log_error( - "Could not locate a source with id %s to create a connection", - connectionSrcProcId.c_str()); - throw std::invalid_argument( - "Could not locate a source with id %s to create a connection " - + connectionSrcProcId); - } - - uuid_t destUUID; - uuid_parse(destId.c_str(), destUUID); - Processor *destProcessor = this->_root->findProcessor(destUUID); - // If we could not find name, try by UUID - if (!destProcessor) { - uuid_t destUuid; - uuid_parse(destId.c_str(), destUuid); - destProcessor = this->_root->findProcessor(destUuid); - } - if (destProcessor) { - std::string destUuid = destProcessor->getUUIDStr(); - } - - uuid_t srcUuid; - uuid_t destUuid; - srcProcessor->getUUID(srcUuid); - connection->setSourceProcessorUUID(srcUuid); - destProcessor->getUUID(destUuid); - connection->setDestinationProcessorUUID(destUuid); - - if (connection) { - parent->addConnection(connection); - } - } - } - - if (connection) - parent->addConnection(connection); - - return; - } -} - -void FlowControllerImpl::parsePortYaml(YAML::Node *portNode, - ProcessGroup *parent, TransferDirection direction) { - uuid_t uuid; - Processor *processor = NULL; - RemoteProcessorGroupPort *port = NULL; - - if (!parent) { - logger_->log_error("parseProcessNode: no parent group existed"); - return; - } - - YAML::Node inputPortsObj = portNode->as<YAML::Node>(); - - // generate the random UIID - uuid_generate(uuid); - - auto portId = inputPortsObj["id"].as<std::string>(); - auto nameStr = inputPortsObj["name"].as<std::string>(); - uuid_parse(portId.c_str(), uuid); - - port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); - - processor = (Processor *) port; - port->setDirection(direction); - port->setTimeOut(parent->getTimeOut()); - port->setTransmitting(true); - processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); - processor->initialize(); - - // handle port properties - YAML::Node nodeVal = portNode->as<YAML::Node>(); - YAML::Node propertiesNode = nodeVal["Properties"]; - - parsePropertiesNodeYaml(&propertiesNode, processor); - - // add processor to parent - parent->addProcessor(processor); - processor->setScheduledState(RUNNING); - auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as< - std::string>(); - int64_t maxConcurrentTasks; - if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", - maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); +void FlowController::waitUnload(const uint64_t timeToWaitMs) { + if (running_) { + // use the current time and increment with the provided argument. + std::chrono::system_clock::time_point wait_time = + std::chrono::system_clock::now() + + std::chrono::milliseconds(timeToWaitMs); + + // create an asynchronous future. + std::future<void> unload_task = std::async(std::launch::async, + [this]() {unload();}); + + if (std::future_status::ready == unload_task.wait_until(wait_time)) { + running_ = false; + } + } } -void FlowControllerImpl::parsePropertiesNodeYaml(YAML::Node *propertiesNode, - Processor *processor) { - // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated - for (YAML::const_iterator propsIter = propertiesNode->begin(); - propsIter != propertiesNode->end(); ++propsIter) { - std::string propertyName = propsIter->first.as<std::string>(); - YAML::Node propertyValueNode = propsIter->second; - if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) { - std::string rawValueString = propertyValueNode.as<std::string>(); - if (!processor->setProperty(propertyName, rawValueString)) { - logger_->log_warn( - "Received property %s with value %s but is not one of the properties for %s", - propertyName.c_str(), rawValueString.c_str(), - processor->getName().c_str()); - } - } - } +void FlowController::unload() { + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + if (running_) { + stop(true); + } + if (initialized_) { + logger_->log_info("Unload Flow Controller"); + root_ = nullptr; + initialized_ = false; + name_ = ""; + } + + return; } -#endif /* ifdef YAML_SUPPORT */ -void FlowControllerImpl::load() { - if (_running) { - stop(true); - } - if (!_initialized) { - logger_->log_info("Load Flow Controller from file %s", _configurationFileName.c_str()); - -#ifdef YAML_SUPPORT - YAML::Node flow = YAML::LoadFile(_configurationFileName); - - YAML::Node flowControllerNode = flow["Flow Controller"]; - YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; - YAML::Node connectionsNode = flow["Connections"]; - YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"]; +void FlowController::load() { + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + if (running_) { + stop(true); + } + if (!initialized_) { + logger_->log_info("Load Flow Controller from file %s", + configuration_filename_.c_str()); - // Create the root process group - parseRootProcessGroupYaml(flowControllerNode); - parseProcessorNodeYaml(processorsNode, this->_root); - parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root); - parseConnectionYaml(&connectionsNode, this->_root); + this->root_ = flow_configuration_->getRoot(configuration_filename_); - // Load Flow File from Repo - loadFlowRepo(); -#endif + // Load Flow File from Repo + loadFlowRepo(); - _initialized = true; - } + initialized_ = true; + } } -void FlowControllerImpl::loadFlowRepo() -{ - if (this->_flowfileRepo && this->_flowfileRepo->isEnable()) - { - std::map<std::string, Connection *> connectionMap; - this->_root->getConnections(&connectionMap); - this->_flowfileRepo->loadFlowFileToConnections(&connectionMap); - } -} - -void FlowControllerImpl::reload(std::string yamlFile) -{ - logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str()); +void FlowController::reload(std::string yamlFile) { + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + logger_->log_info("Starting to reload Flow Controller with yaml %s", + yamlFile.c_str()); + stop(true); + unload(); + std::string oldYamlFile = this->configuration_filename_; + this->configuration_filename_ = yamlFile; + load(); + start(); + if (this->root_ != nullptr) { + this->configuration_filename_ = oldYamlFile; + logger_->log_info("Rollback Flow Controller to YAML %s", + oldYamlFile.c_str()); stop(true); unload(); - std::string oldYamlFile = this->_configurationFileName; - this->_configurationFileName = yamlFile; load(); start(); - if (!this->_root) - { - this->_configurationFileName = oldYamlFile; - logger_->log_info("Rollback Flow Controller to YAML %s", oldYamlFile.c_str()); - stop(true); - unload(); - load(); - start(); + } +} + +void FlowController::loadFlowRepo() { + if (this->flow_file_repo_) { + std::map<std::string, std::shared_ptr<Connection>> connectionMap; + if (this->root_ != nullptr) { + this->root_->getConnections(connectionMap); } + auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>( + flow_file_repo_); + rep->loadFlowFileToConnections(connectionMap); + } } -bool FlowControllerImpl::start() { - if (!_initialized) { - logger_->log_error( - "Can not start Flow Controller because it has not been initialized"); - return false; - } else { - - if (!_running) { - logger_->log_info("Starting Flow Controller"); - this->_timerScheduler.start(); - this->_eventScheduler.start(); - if (this->_root) - this->_root->startProcessing(&this->_timerScheduler, - &this->_eventScheduler); - _running = true; - this->_protocol->start(); - this->_provenanceRepo->start(); - this->_flowfileRepo->start(); - logger_->log_info("Started Flow Controller"); - } - return true; - } +bool FlowController::start() { + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + if (!initialized_) { + logger_->log_error( + "Can not start Flow Controller because it has not been initialized"); + return false; + } else { + + if (!running_) { + logger_->log_info("Starting Flow Controller"); + this->_timerScheduler.start(); + this->_eventScheduler.start(); + if (this->root_ != nullptr) { + this->root_->startProcessing(&this->_timerScheduler, + &this->_eventScheduler); + } + running_ = true; + this->protocol_->start(); + this->provenance_repo_->start(); + this->flow_file_repo_->start(); + logger_->log_info("Started Flow Controller"); + } + return true; + } } + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index a2f2323..7383574 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -27,253 +27,339 @@ #include <cstdio> #include "FlowFileRecord.h" -#include "Relationship.h" -#include "Logger.h" -#include "FlowController.h" -#include "FlowFileRepository.h" - -std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0); - -FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim) -: _size(0), - _id(_localFlowSeqNumber.load()), - _offset(0), - _penaltyExpirationMs(0), - _claim(claim), - _isStoredToRepo(false), - _markedDelete(false), - _connection(NULL), - _orginalConnection(NULL) -{ - _entryDate = getTimeMillis(); - _lineageStartDate = _entryDate; - - char uuidStr[37]; - - // Generate the global UUID for the flow record - uuid_generate(_uuid); - // Increase the local ID for the flow record - ++_localFlowSeqNumber; - uuid_unparse_lower(_uuid, uuidStr); - _uuidStr = uuidStr; - - // Populate the default attributes - addAttribute(FILENAME, std::to_string(getTimeNano())); - addAttribute(PATH, DEFAULT_FLOWFILE_PATH); - addAttribute(UUID, uuidStr); - // Populate the attributes from the input - std::map<std::string, std::string>::iterator it; - for (it = attributes.begin(); it!= attributes.end(); it++) - { - addAttribute(it->first, it->second); - } +#include "core/logging/Logger.h" +#include "core/Relationship.h" +#include "core/Repository.h" - _snapshot = false; +namespace org { +namespace apache { +namespace nifi { +namespace minifi { - if (_claim) - // Increase the flow file record owned count for the resource claim - _claim->increaseFlowFileRecordOwnedCount(); - logger_ = Logger::getLogger(); -} +std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0); -FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event) -: _size(0), - _id(_localFlowSeqNumber.load()), - _offset(0), - _penaltyExpirationMs(0), - _claim(NULL), - _isStoredToRepo(false), - _markedDelete(false), - _connection(NULL), - _orginalConnection(NULL) -{ - _entryDate = event->getFlowFileEntryDate(); - _lineageStartDate = event->getlineageStartDate(); - _size = event->getFileSize(); - _offset = event->getFileOffset(); - _lineageIdentifiers = event->getLineageIdentifiers(); - _attributes = event->getAttributes(); - _snapshot = false; - _uuidStr = event->getFlowFileUuid(); - uuid_parse(_uuidStr.c_str(), _uuid); - - if (_size > 0) - { - _claim = new ResourceClaim(); - } +FlowFileRecord::FlowFileRecord( + std::shared_ptr<core::Repository> flow_repository, + std::map<std::string, std::string> attributes, + std::shared_ptr<ResourceClaim> claim) + : FlowFile(), + flow_repository_(flow_repository) { + + id_ = local_flow_seq_number_.load(); + claim_ = claim; + // Increase the local ID for the flow record + ++local_flow_seq_number_; + // Populate the default attributes + addKeyedAttribute(FILENAME, std::to_string(getTimeNano())); + addKeyedAttribute(PATH, DEFAULT_FLOWFILE_PATH); + addKeyedAttribute(UUID, getUUIDStr()); + // Populate the attributes from the input + std::map<std::string, std::string>::iterator it; + for (it = attributes.begin(); it != attributes.end(); it++) { + FlowFile::addAttribute(it->first, it->second); + } + + snapshot_ = false; - if (_claim) - { - _claim->setContentFullPath(event->getContentFullPath()); - // Increase the flow file record owned count for the resource claim - _claim->increaseFlowFileRecordOwnedCount(); + if (claim_ != nullptr) + // Increase the flow file record owned count for the resource claim + claim_->increaseFlowFileRecordOwnedCount(); + logger_ = logging::Logger::getLogger(); +} + +FlowFileRecord::FlowFileRecord( + std::shared_ptr<core::Repository> flow_repository, + std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection) + : FlowFile(), + snapshot_(""), + flow_repository_(flow_repository) { + entry_date_ = event->getEntryDate(); + lineage_start_date_ = event->getlineageStartDate(); + lineage_Identifiers_ = event->getlineageIdentifiers(); + uuid_str_ = event->getUUIDStr(); + attributes_ = event->getAttributes(); + size_ = event->getSize(); + offset_ = event->getOffset(); + event->getUUID(uuid_); + uuid_connection_ = uuidConnection; + if (event->getResourceClaim()) { + content_full_fath_ = event->getResourceClaim()->getContentFullPath(); } - logger_ = Logger::getLogger(); - ++_localFlowSeqNumber; } +FlowFileRecord::FlowFileRecord( + std::shared_ptr<core::Repository> flow_repository, + std::shared_ptr<core::FlowFile> &event) + : FlowFile(), + uuid_connection_(""), + snapshot_(""), + flow_repository_(flow_repository) { -FlowFileRecord::~FlowFileRecord() -{ - if (!_snapshot) - logger_->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str()); - else - logger_->log_debug("Delete SnapShot FlowFile UUID %s", _uuidStr.c_str()); - if (_claim) - { - // Decrease the flow file record owned count for the resource claim - _claim->decreaseFlowFileRecordOwnedCount(); - if (_claim->getFlowFileRecordOwnedCount() <= 0) - { - logger_->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str()); - std::string value; - if (!FlowControllerFactory::getFlowController()->getFlowFileRepository() || - !FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() || - !this->_isStoredToRepo || - !FlowControllerFactory::getFlowController()->getFlowFileRepository()->Get(_uuidStr, value)) - { - // if it is persistent to DB already while it is in the queue, we keep the content - std::remove(_claim->getContentFullPath().c_str()); - } - delete _claim; - } - } } -bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return addAttribute(keyString, value); - } - else - { - return false; - } +FlowFileRecord::~FlowFileRecord() { + if (!snapshot_) + logger_->log_debug("Delete FlowFile UUID %s", uuid_str_.c_str()); + else + logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuid_str_.c_str()); + if (claim_) { + // Decrease the flow file record owned count for the resource claim + claim_->decreaseFlowFileRecordOwnedCount(); + std::string value; + if (claim_->getFlowFileRecordOwnedCount() <= 0) { + logger_->log_debug("Delete Resource Claim %s", + claim_->getContentFullPath().c_str()); + if (!this->stored || !flow_repository_->Get(uuid_str_, value)) { + std::remove(claim_->getContentFullPath().c_str()); + } + } + } } -bool FlowFileRecord::addAttribute(std::string key, std::string value) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - // attribute already there in the map - return false; - } - else - { - _attributes[key] = value; - return true; - } +bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, std::string value) { + const char *keyStr = FlowAttributeKey(key); + if (keyStr) { + const std::string keyString = keyStr; + return FlowFile::addAttribute(keyString, value); + } else { + return false; + } } -bool FlowFileRecord::removeAttribute(FlowAttribute key) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return removeAttribute(keyString); - } - else - { - return false; - } +bool FlowFileRecord::removeKeyedAttribute(FlowAttribute key) { + const char *keyStr = FlowAttributeKey(key); + if (keyStr) { + std::string keyString = keyStr; + return FlowFile::removeAttribute(keyString); + } else { + return false; + } } -bool FlowFileRecord::removeAttribute(std::string key) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - _attributes.erase(key); - return true; - } - else - { - return false; - } +bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key, + std::string value) { + const char *keyStr = FlowAttributeKey(key); + if (keyStr) { + std::string keyString = keyStr; + return FlowFile::updateAttribute(keyString, value); + } else { + return false; + } } -bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return updateAttribute(keyString, value); - } - else - { - return false; - } +bool FlowFileRecord::getKeyedAttribute(FlowAttribute key, std::string &value) { + const char *keyStr = FlowAttributeKey(key); + if (keyStr) { + std::string keyString = keyStr; + return FlowFile::getAttribute(keyString, value); + } else { + return false; + } } -bool FlowFileRecord::updateAttribute(std::string key, std::string value) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - _attributes[key] = value; - return true; - } - else - { - return false; - } +FlowFileRecord &FlowFileRecord::operator=(const FlowFileRecord &other) { + core::FlowFile::operator=(other); + uuid_connection_ = other.uuid_connection_; + content_full_fath_ = other.content_full_fath_; + snapshot_ = other.snapshot_; + return *this; } -bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return getAttribute(keyString, value); - } - else - { - return false; - } +bool FlowFileRecord::DeSerialize(std::string key) { + std::string value; + bool ret; + + ret = flow_repository_->Get(key, value); + + if (!ret) { + logger_->log_error("NiFi FlowFile Store event %s can not found", + key.c_str()); + return false; + } else + logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(), + value.length()); + + io::DataStream stream((const uint8_t*) value.data(), value.length()); + + ret = DeSerialize(stream); + + if (ret) { + logger_->log_debug( + "NiFi FlowFile retrieve uuid %s size %d connection %s success", + uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); + } else { + logger_->log_debug( + "NiFi FlowFile retrieve uuid %s size %d connection %d fail", + uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); + } + + return ret; } -bool FlowFileRecord::getAttribute(std::string key, std::string &value) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - value = it->second; - return true; - } - else - { - return false; - } +bool FlowFileRecord::Serialize() { + + io::DataStream outStream; + + int ret; + + ret = write(this->event_time_, &outStream); + if (ret != 8) { + + return false; + } + + ret = write(this->entry_date_, &outStream); + if (ret != 8) { + return false; + } + + ret = write(this->lineage_start_date_, &outStream); + if (ret != 8) { + + return false; + } + + ret = writeUTF(this->uuid_str_, &outStream); + if (ret <= 0) { + + return false; + } + + ret = writeUTF(this->uuid_connection_, &outStream); + if (ret <= 0) { + + return false; + } + // write flow attributes + uint32_t numAttributes = this->attributes_.size(); + ret = write(numAttributes, &outStream); + if (ret != 4) { + + return false; + } + + for (auto itAttribute : attributes_) { + ret = writeUTF(itAttribute.first, &outStream, true); + if (ret <= 0) { + + return false; + } + ret = writeUTF(itAttribute.second, &outStream, true); + if (ret <= 0) { + + return false; + } + } + + ret = writeUTF(this->content_full_fath_, &outStream); + if (ret <= 0) { + + return false; + } + + ret = write(this->size_, &outStream); + if (ret != 8) { + + return false; + } + + ret = write(this->offset_, &outStream); + if (ret != 8) { + + return false; + } + + // Persistent to the DB + + + if (flow_repository_->Put(uuid_str_, + const_cast<uint8_t*>(outStream.getBuffer()), + outStream.getSize())) { + logger_->log_debug("NiFi FlowFile Store event %s size %d success", + uuid_str_.c_str(), outStream.getSize()); + return true; + } else { + logger_->log_error("NiFi FlowFile Store event %s size %d fail", + uuid_str_.c_str(), outStream.getSize()); + return false; + } + + // cleanup + + return true; } -void FlowFileRecord::duplicate(FlowFileRecord *original) -{ - uuid_copy(this->_uuid, original->_uuid); - this->_attributes = original->_attributes; - this->_entryDate = original->_entryDate; - this->_id = original->_id; - this->_lastQueueDate = original->_lastQueueDate; - this->_lineageStartDate = original->_lineageStartDate; - this->_offset = original->_offset; - this->_penaltyExpirationMs = original->_penaltyExpirationMs; - this->_size = original->_size; - this->_lineageIdentifiers = original->_lineageIdentifiers; - this->_orginalConnection = original->_orginalConnection; - this->_uuidStr = original->_uuidStr; - this->_connection = original->_connection; - this->_markedDelete = original->_markedDelete; - - this->_claim = original->_claim; - if (this->_claim) - this->_claim->increaseFlowFileRecordOwnedCount(); - - this->_snapshot = true; +bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { + + int ret; + + io::DataStream outStream(buffer, bufferSize); + + ret = read(this->event_time_, &outStream); + if (ret != 8) { + return false; + } + + ret = read(this->entry_date_, &outStream); + if (ret != 8) { + return false; + } + + ret = read(this->lineage_start_date_, &outStream); + if (ret != 8) { + return false; + } + + ret = readUTF(this->uuid_str_, &outStream); + if (ret <= 0) { + return false; + } + + ret = readUTF(this->uuid_connection_, &outStream); + if (ret <= 0) { + return false; + } + + // read flow attributes + uint32_t numAttributes = 0; + ret = read(numAttributes, &outStream); + if (ret != 4) { + return false; + } + + for (uint32_t i = 0; i < numAttributes; i++) { + std::string key; + ret = readUTF(key, &outStream, true); + if (ret <= 0) { + return false; + } + std::string value; + ret = readUTF(value, &outStream, true); + if (ret <= 0) { + return false; + } + this->attributes_[key] = value; + } + + ret = readUTF(this->content_full_fath_, &outStream); + if (ret <= 0) { + return false; + } + + ret = read(this->size_, &outStream); + if (ret != 8) { + return false; + } + + ret = read(this->offset_, &outStream); + if (ret != 8) { + return false; + } + + return true; } +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRepository.cpp b/libminifi/src/FlowFileRepository.cpp deleted file mode 100644 index 3388738..0000000 --- a/libminifi/src/FlowFileRepository.cpp +++ /dev/null @@ -1,282 +0,0 @@ -/** - * @file FlowFileRepository.cpp - * FlowFile implemenatation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <cstdint> -#include <vector> -#include <arpa/inet.h> -#include "io/DataStream.h" -#include "io/Serializable.h" -#include "FlowFileRecord.h" -#include "Relationship.h" -#include "Logger.h" -#include "FlowController.h" -#include "FlowFileRepository.h" - -//! DeSerialize -bool FlowFileEventRecord::DeSerialize(FlowFileRepository *repo, - std::string key) { - std::string value; - bool ret; - - ret = repo->Get(key, value); - - if (!ret) { - logger_->log_error("NiFi FlowFile Store event %s can not found", - key.c_str()); - return false; - } else - logger_->log_debug("NiFi FlowFile Read event %s length %d", - key.c_str(), value.length()); - - - DataStream stream((const uint8_t*)value.data(),value.length()); - - ret = DeSerialize(stream); - - if (ret) { - logger_->log_debug( - "NiFi FlowFile retrieve uuid %s size %d connection %s success", - _uuid.c_str(), stream.getSize(), _uuidConnection.c_str()); - } else { - logger_->log_debug( - "NiFi FlowFile retrieve uuid %s size %d connection %d fail", - _uuid.c_str(), stream.getSize(), _uuidConnection.c_str()); - } - - return ret; -} - -bool FlowFileEventRecord::Serialize(FlowFileRepository *repo) { - - DataStream outStream; - - int ret; - - ret = write(this->_eventTime,&outStream); - if (ret != 8) { - - return false; - } - - ret = write(this->_entryDate,&outStream); - if (ret != 8) { - return false; - } - - ret = write(this->_lineageStartDate,&outStream); - if (ret != 8) { - - return false; - } - - ret = writeUTF(this->_uuid,&outStream); - if (ret <= 0) { - - return false; - } - - ret = writeUTF(this->_uuidConnection,&outStream); - if (ret <= 0) { - - return false; - } - - // write flow attributes - uint32_t numAttributes = this->_attributes.size(); - ret = write(numAttributes,&outStream); - if (ret != 4) { - - return false; - } - - for (auto itAttribute : _attributes) { - ret = writeUTF(itAttribute.first,&outStream, true); - if (ret <= 0) { - - return false; - } - ret = writeUTF(itAttribute.second,&outStream, true); - if (ret <= 0) { - - return false; - } - } - - ret = writeUTF(this->_contentFullPath,&outStream); - if (ret <= 0) { - - return false; - } - - ret = write(this->_size,&outStream); - if (ret != 8) { - - return false; - } - - ret = write(this->_offset,&outStream); - if (ret != 8) { - - return false; - } - - // Persistent to the DB - - if (repo->Put(_uuid, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { - logger_->log_debug("NiFi FlowFile Store event %s size %d success", - _uuid.c_str(), outStream.getSize()); - return true; - } else { - logger_->log_error("NiFi FlowFile Store event %s size %d fail", - _uuid.c_str(), outStream.getSize()); - return false; - } - - // cleanup - - return true; -} - -bool FlowFileEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { - - int ret; - - DataStream outStream(buffer,bufferSize); - - ret = read(this->_eventTime,&outStream); - if (ret != 8) { - return false; - } - - ret = read(this->_entryDate,&outStream); - if (ret != 8) { - return false; - } - - ret = read(this->_lineageStartDate,&outStream); - if (ret != 8) { - return false; - } - - ret = readUTF(this->_uuid,&outStream); - if (ret <= 0) { - return false; - } - - ret = readUTF(this->_uuidConnection,&outStream); - if (ret <= 0) { - return false; - } - - // read flow attributes - uint32_t numAttributes = 0; - ret = read(numAttributes,&outStream); - if (ret != 4) { - return false; - } - - for (uint32_t i = 0; i < numAttributes; i++) { - std::string key; - ret = readUTF(key,&outStream, true); - if (ret <= 0) { - return false; - } - std::string value; - ret = readUTF(value,&outStream, true); - if (ret <= 0) { - return false; - } - this->_attributes[key] = value; - } - - ret = readUTF(this->_contentFullPath,&outStream); - if (ret <= 0) { - return false; - } - - ret = read(this->_size,&outStream); - if (ret != 8) { - return false; - } - - ret = read(this->_offset,&outStream); - if (ret != 8) { - return false; - } - - return true; -} - -void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, Connection *> *connectionMap) -{ -#ifdef LEVELDB_SUPPORT - if (!_enable) - return; - - std::vector<std::string> purgeList; - leveldb::Iterator* it = _db->NewIterator( - leveldb::ReadOptions()); - - for (it->SeekToFirst(); it->Valid(); it->Next()) - { - FlowFileEventRecord eventRead; - std::string key = it->key().ToString(); - if (eventRead.DeSerialize((uint8_t *) it->value().data(), - (int) it->value().size())) - { - auto search = connectionMap->find(eventRead.getConnectionUuid()); - if (search != connectionMap->end()) - { - // we find the connection for the persistent flowfile, create the flowfile and enqueue that - FlowFileRecord *record = new FlowFileRecord(&eventRead); - // set store to repo to true so that we do need to persistent again in enqueue - record->setStoredToRepository(true); - search->second->put(record); - } - else - { - if (eventRead.getContentFullPath().length() > 0) - { - std::remove(eventRead.getContentFullPath().c_str()); - } - purgeList.push_back(key); - } - } - else - { - purgeList.push_back(key); - } - } - - delete it; - std::vector<std::string>::iterator itPurge; - for (itPurge = purgeList.begin(); itPurge != purgeList.end(); - itPurge++) - { - std::string eventId = *itPurge; - logger_->log_info("Repository Repo %s Purge %s", - RepositoryTypeStr[_type], - eventId.c_str()); - Delete(eventId); - } -#endif - - return; -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/GenerateFlowFile.cpp b/libminifi/src/GenerateFlowFile.cpp deleted file mode 100644 index 12d7f70..0000000 --- a/libminifi/src/GenerateFlowFile.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/** - * @file GenerateFlowFile.cpp - * GenerateFlowFile class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <random> -#include "utils/StringUtils.h" - -#include "GenerateFlowFile.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary"; -const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text"; -const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile"); -Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB"); -Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1"); -Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY); -Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", - "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true"); -Relationship GenerateFlowFile::Success("success", "success operational on the flow record"); - -void GenerateFlowFile::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(FileSize); - properties.insert(BatchSize); - properties.insert(DataFormat); - properties.insert(UniqueFlowFiles); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession *session) -{ - int64_t batchSize = 1; - bool uniqueFlowFile = true; - int64_t fileSize = 1024; - - std::string value; - if (context->getProperty(FileSize.getName(), value)) - { - Property::StringToInt(value, fileSize); - } - if (context->getProperty(BatchSize.getName(), value)) - { - Property::StringToInt(value, batchSize); - } - if (context->getProperty(UniqueFlowFiles.getName(), value)) - { - StringUtils::StringToBool(value, uniqueFlowFile); - } - - if (!uniqueFlowFile) - { - char *data; - data = new char[fileSize]; - if (!data) - return; - uint64_t dataSize = fileSize; - GenerateFlowFile::WriteCallback callback(data, dataSize); - char *current = data; - for (int i = 0; i < fileSize; i+= sizeof(int)) - { - int randValue = random(); - *((int *) current) = randValue; - current += sizeof(int); - } - for (int i = 0; i < batchSize; i++) - { - // For each batch - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - if (fileSize > 0) - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - } - delete[] data; - } - else - { - if (!_data) - { - // We have not create the unique data yet - _data = new char[fileSize]; - _dataSize = fileSize; - char *current = _data; - for (int i = 0; i < fileSize; i+= sizeof(int)) - { - int randValue = random(); - *((int *) current) = randValue; - // *((int *) current) = (0xFFFFFFFF & i); - current += sizeof(int); - } - } - GenerateFlowFile::WriteCallback callback(_data, _dataSize); - for (int i = 0; i < batchSize; i++) - { - // For each batch - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - if (fileSize > 0) - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp deleted file mode 100644 index 40dd387..0000000 --- a/libminifi/src/GetFile.cpp +++ /dev/null @@ -1,329 +0,0 @@ -/** - * @file GetFile.cpp - * GetFile class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <time.h> -#include <sstream> -#include <stdio.h> -#include <string> -#include <iostream> -#include <dirent.h> -#include <limits.h> -#include <unistd.h> -#if (__GNUC__ >= 4) - #if (__GNUC_MINOR__ < 9) - #include <regex.h> - #endif -#endif -#include "utils/StringUtils.h" -#include <regex> -#include "utils/TimeUtil.h" -#include "GetFile.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string GetFile::ProcessorName("GetFile"); -Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10"); -Property GetFile::Directory("Input Directory", "The input directory from which to pull files", "."); -Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true"); -Property GetFile::KeepSourceFile("Keep Source File", - "If true, the file is not deleted after it has been copied to the Content Repository", "false"); -Property GetFile::MaxAge("Maximum File Age", - "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", "0 sec"); -Property GetFile::MinAge("Minimum File Age", - "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", "0 sec"); -Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B"); -Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B"); -Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec"); -Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true"); -Property GetFile::FileFilter("File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*"); -Relationship GetFile::Success("success", "All files are routed to success"); - -void GetFile::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(BatchSize); - properties.insert(Directory); - properties.insert(IgnoreHiddenFile); - properties.insert(KeepSourceFile); - properties.insert(MaxAge); - properties.insert(MinAge); - properties.insert(MaxSize); - properties.insert(MinSize); - properties.insert(PollInterval); - properties.insert(Recurse); - properties.insert(FileFilter); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - - logger_->log_info("onTrigger GetFile"); - if (context->getProperty(Directory.getName(), value)) - { - _directory = value; - } - if (context->getProperty(BatchSize.getName(), value)) - { - Property::StringToInt(value, _batchSize); - } - if (context->getProperty(IgnoreHiddenFile.getName(), value)) - { - StringUtils::StringToBool(value, _ignoreHiddenFile); - } - if (context->getProperty(KeepSourceFile.getName(), value)) - { - StringUtils::StringToBool(value, _keepSourceFile); - } - - logger_->log_info("onTrigger GetFile"); - if (context->getProperty(MaxAge.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _maxAge, unit) && - Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge)) - { - - } - } - if (context->getProperty(MinAge.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _minAge, unit) && - Property::ConvertTimeUnitToMS(_minAge, unit, _minAge)) - { - - } - } - if (context->getProperty(MaxSize.getName(), value)) - { - Property::StringToInt(value, _maxSize); - } - if (context->getProperty(MinSize.getName(), value)) - { - Property::StringToInt(value, _minSize); - } - if (context->getProperty(PollInterval.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _pollInterval, unit) && - Property::ConvertTimeUnitToMS(_pollInterval, unit, _pollInterval)) - { - - } - } - if (context->getProperty(Recurse.getName(), value)) - { - StringUtils::StringToBool(value, _recursive); - } - - if (context->getProperty(FileFilter.getName(), value)) - { - _fileFilter = value; - } - - // Perform directory list - logger_->log_info("Is listing empty %i",isListingEmpty()); - if (isListingEmpty()) - { - if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) - { - performListing(_directory); - } - } - logger_->log_info("Is listing empty %i",isListingEmpty()); - - if (!isListingEmpty()) - { - try - { - std::queue<std::string> list; - pollListing(list, _batchSize); - while (!list.empty()) - { - - std::string fileName = list.front(); - list.pop(); - logger_->log_info("GetFile process %s", fileName.c_str()); - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - std::size_t found = fileName.find_last_of("/\\"); - std::string path = fileName.substr(0,found); - std::string name = fileName.substr(found+1); - flowFile->updateAttribute(FILENAME, name); - flowFile->updateAttribute(PATH, path); - flowFile->addAttribute(ABSOLUTE_PATH, fileName); - session->import(fileName, flowFile, _keepSourceFile); - session->transfer(flowFile, Success); - } - } - catch (std::exception &exception) - { - logger_->log_debug("GetFile Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - throw; - } - } - -} - -bool GetFile::isListingEmpty() -{ - std::lock_guard<std::mutex> lock(_mtx); - - return _dirList.empty(); -} - -void GetFile::putListing(std::string fileName) -{ - std::lock_guard<std::mutex> lock(_mtx); - - _dirList.push(fileName); -} - -void GetFile::pollListing(std::queue<std::string> &list, int maxSize) -{ - std::lock_guard<std::mutex> lock(_mtx); - - while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize)) - { - std::string fileName = _dirList.front(); - _dirList.pop(); - list.push(fileName); - } - - return; -} - -bool GetFile::acceptFile(std::string fullName, std::string name) -{ - struct stat statbuf; - - if (stat(fullName.c_str(), &statbuf) == 0) - { - if (_minSize > 0 && statbuf.st_size <_minSize) - return false; - - if (_maxSize > 0 && statbuf.st_size > _maxSize) - return false; - - uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); - uint64_t fileAge = getTimeMillis() - modifiedTime; - if (_minAge > 0 && fileAge < _minAge) - return false; - if (_maxAge > 0 && fileAge > _maxAge) - return false; - - if (_ignoreHiddenFile && fullName.c_str()[0] == '.') - return false; - - if (access(fullName.c_str(), R_OK) != 0) - return false; - - if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0) - return false; - - - #ifdef __GNUC__ - #if (__GNUC__ >= 4) - #if (__GNUC_MINOR__ < 9) - regex_t regex; - int ret = regcomp(®ex, _fileFilter.c_str(),0); - if (ret) - return false; - ret = regexec(®ex,name.c_str(),(size_t)0,NULL,0); - regfree(®ex); - if (ret) - return false; - #else - try{ - std::regex re(_fileFilter); - - if (!std::regex_match(name, re)) { - return false; - } - } catch (std::regex_error e) { - logger_->log_error("Invalid File Filter regex: %s.", e.what()); - return false; - } - #endif - #endif - #else - logger_->log_info("Cannot support regex filtering"); - #endif - return true; - } - - return false; -} - -void GetFile::performListing(std::string dir) -{ - logger_->log_info("Performing file listing against %s",dir.c_str()); - DIR *d; - d = opendir(dir.c_str()); - if (!d) - return; - // only perform a listing while we are not empty - logger_->log_info("Performing file listing against %s",dir.c_str()); - while (isRunning()) - { - struct dirent *entry; - entry = readdir(d); - if (!entry) - break; - std::string d_name = entry->d_name; - if ((entry->d_type & DT_DIR)) - { - // if this is a directory - if (_recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) - { - std::string path = dir + "/" + d_name; - performListing(path); - } - } - else - { - std::string fileName = dir + "/" + d_name; - if (acceptFile(fileName, d_name)) - { - // check whether we can take this file - putListing(fileName); - } - } - } - closedir(d); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ListenHTTP.cpp b/libminifi/src/ListenHTTP.cpp deleted file mode 100644 index 89ce1d2..0000000 --- a/libminifi/src/ListenHTTP.cpp +++ /dev/null @@ -1,395 +0,0 @@ -/** - * @file ListenHTTP.cpp - * ListenHTTP class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <sstream> -#include <stdio.h> -#include <string> -#include <iostream> -#include <fstream> -#include <uuid/uuid.h> - -#include <CivetServer.h> - -#include "ListenHTTP.h" - -#include "utils/TimeUtil.h" -#include "ProcessContext.h" -#include "ProcessSession.h" -#include "ProcessSessionFactory.h" - -const std::string ListenHTTP::ProcessorName("ListenHTTP"); - -Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener"); -Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", ""); -Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.", ".*"); -Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); -Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); -Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); -Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); -Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes", ""); - -Relationship ListenHTTP::Success("success", "All files are routed to success"); - -void ListenHTTP::initialize() -{ - _logger->log_info("Initializing ListenHTTP"); - - //! Set the supported properties - std::set<Property> properties; - properties.insert(BasePath); - properties.insert(Port); - properties.insert(AuthorizedDNPattern); - properties.insert(SSLCertificate); - properties.insert(SSLCertificateAuthority); - properties.insert(SSLVerifyPeer); - properties.insert(SSLMinimumVersion); - properties.insert(HeadersAsAttributesRegex); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void ListenHTTP::onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) -{ - - std::string basePath; - - if (!context->getProperty(BasePath.getName(), basePath)) - { - _logger->log_info("%s attribute is missing, so default value of %s will be used", - BasePath.getName().c_str(), - BasePath.getValue().c_str()); - basePath = BasePath.getValue(); - } - - basePath.insert(0, "/"); - - std::string listeningPort; - - if (!context->getProperty(Port.getName(), listeningPort)) - { - _logger->log_error("%s attribute is missing or invalid", - Port.getName().c_str()); - return; - } - - std::string authDNPattern; - - if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) - { - _logger->log_info("ListenHTTP using %s: %s", - AuthorizedDNPattern.getName().c_str(), - authDNPattern.c_str()); - } - - std::string sslCertFile; - - if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) - { - _logger->log_info("ListenHTTP using %s: %s", - SSLCertificate.getName().c_str(), - sslCertFile.c_str()); - } - - // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set - std::string sslCertAuthorityFile; - std::string sslVerifyPeer; - std::string sslMinVer; - - if (!sslCertFile.empty()) - { - if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) - && !sslCertAuthorityFile.empty()) - { - _logger->log_info("ListenHTTP using %s: %s", - SSLCertificateAuthority.getName().c_str(), - sslCertAuthorityFile.c_str()); - } - - if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) - { - if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) - { - _logger->log_info("ListenHTTP will not verify peers"); - } - else - { - _logger->log_info("ListenHTTP will verify peers"); - } - } - else - { - _logger->log_info("ListenHTTP will not verify peers"); - } - - if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) - { - _logger->log_info("ListenHTTP using %s: %s", - SSLMinimumVersion.getName().c_str(), - sslMinVer.c_str()); - } - } - - std::string headersAsAttributesPattern; - - if (context->getProperty(HeadersAsAttributesRegex.getName(),headersAsAttributesPattern) - && !headersAsAttributesPattern.empty()) - { - _logger->log_info("ListenHTTP using %s: %s", - HeadersAsAttributesRegex.getName().c_str(), - headersAsAttributesPattern.c_str()); - } - - auto numThreads = getMaxConcurrentTasks(); - - _logger->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", - listeningPort.c_str(), - basePath.c_str(), - numThreads); - - // Initialize web server - std::vector<std::string> options; - options.push_back("enable_keep_alive"); - options.push_back("yes"); - options.push_back("keep_alive_timeout_ms"); - options.push_back("15000"); - options.push_back("num_threads"); - options.push_back(std::to_string(numThreads)); - - if (sslCertFile.empty()) - { - options.push_back("listening_ports"); - options.push_back(listeningPort); - } - else - { - listeningPort += "s"; - options.push_back("listening_ports"); - options.push_back(listeningPort); - - options.push_back("ssl_certificate"); - options.push_back(sslCertFile); - - if (!sslCertAuthorityFile.empty()) - { - options.push_back("ssl_ca_file"); - options.push_back(sslCertAuthorityFile); - } - - if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) - { - options.push_back("ssl_verify_peer"); - options.push_back("no"); - } - else - { - options.push_back("ssl_verify_peer"); - options.push_back("yes"); - } - - if (sslMinVer.compare("SSL2") == 0) - { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(0)); - } - else if (sslMinVer.compare("SSL3") == 0) - { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(1)); - } - else if (sslMinVer.compare("TLS1.0") == 0) - { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(2)); - } - else if (sslMinVer.compare("TLS1.1") == 0) - { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(3)); - } - else - { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(4)); - } - } - - _server.reset(new CivetServer(options)); - _handler.reset(new Handler(context, - sessionFactory, - std::move(authDNPattern), - std::move(headersAsAttributesPattern))); - _server->addHandler(basePath, _handler.get()); -} - -void ListenHTTP::onTrigger(ProcessContext *context, ProcessSession *session) -{ - - FlowFileRecord *flowFile = session->get(); - - // Do nothing if there are no incoming files - if (!flowFile) - { - return; - } -} - -ListenHTTP::Handler::Handler(ProcessContext *context, - ProcessSessionFactory *sessionFactory, - std::string &&authDNPattern, - std::string &&headersAsAttributesPattern) -: _authDNRegex(std::move(authDNPattern)) -, _headersAsAttributesRegex(std::move(headersAsAttributesPattern)) -{ - _processContext = context; - _processSessionFactory = sessionFactory; -} - -void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) -{ - mg_printf(conn, - "HTTP/1.1 500 Internal Server Error\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); -} - -bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) -{ - _logger = Logger::getLogger(); - - auto req_info = mg_get_request_info(conn); - _logger->log_info("ListenHTTP handling POST request of length %d", req_info->content_length); - - // If this is a two-way TLS connection, authorize the peer against the configured pattern - if (req_info->is_ssl && req_info->client_cert != nullptr) - { - if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) - { - mg_printf(conn, - "HTTP/1.1 403 Forbidden\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); - _logger->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject); - return true; - } - } - - // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) - mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); - - auto session = _processSessionFactory->createSession(); - ListenHTTP::WriteCallback callback(conn, req_info); - auto flowFile = session->create(); - - if (!flowFile) - { - sendErrorResponse(conn); - return true; - } - - try - { - session->write(flowFile, &callback); - - // Add filename from "filename" header value (and pattern headers) - for (int i = 0; i < req_info->num_headers; i++) - { - auto header = &req_info->http_headers[i]; - - if (strcmp("filename", header->name) == 0) - { - if (!flowFile->updateAttribute("filename", header->value)) - { - flowFile->addAttribute("filename", header->value); - } - } - else if (std::regex_match(header->name, _headersAsAttributesRegex)) - { - if (!flowFile->updateAttribute(header->name, header->value)) - { - flowFile->addAttribute(header->name, header->value); - } - } - } - - session->transfer(flowFile, Success); - session->commit(); - } - catch (std::exception &exception) - { - _logger->log_debug("ListenHTTP Caught Exception %s", exception.what()); - sendErrorResponse(conn); - session->rollback(); - throw; - } - catch (...) - { - _logger->log_debug("ListenHTTP Caught Exception Processor::onTrigger"); - sendErrorResponse(conn); - session->rollback(); - throw; - } - - mg_printf(conn, - "HTTP/1.1 200 OK\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); - - return true; -} - -ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) -{ - _logger = Logger::getLogger(); - _conn = conn; - _reqInfo = reqInfo; -} - -void ListenHTTP::WriteCallback::process(std::ofstream *stream) -{ - long long rlen; - long long nlen = 0; - long long tlen = _reqInfo->content_length; - char buf[16384]; - - while (nlen < tlen) - { - rlen = tlen - nlen; - - if (rlen > sizeof(buf)) - { - rlen = sizeof(buf); - } - - // Read a buffer of data from client - rlen = mg_read(_conn, &buf[0], (size_t)rlen); - - if (rlen <= 0) - { - break; - } - - // Transfer buffer data to the output stream - stream->write(&buf[0], rlen); - - nlen += rlen; - } -}
