http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp new file mode 100644 index 0000000..3cbbc1b --- /dev/null +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -0,0 +1,255 @@ +/** + * @file ExecuteProcess.cpp + * ExecuteProcess class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "processors/ExecuteProcess.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include <cstring> +#include "utils/StringUtils.h" +#include "utils/TimeUtil.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +const std::string ExecuteProcess::ProcessorName("ExecuteProcess"); +core::Property ExecuteProcess::Command( + "Command", + "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.", + ""); +core::Property ExecuteProcess::CommandArguments( + "Command Arguments", + "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.", + ""); +core::Property ExecuteProcess::WorkingDir( + "Working Directory", + "The directory to use as the current working directory when executing the command", + ""); +core::Property ExecuteProcess::BatchDuration( + "Batch Duration", + "If the process is expected to be long-running and produce textual output, a batch duration can be specified.", + "0"); +core::Property ExecuteProcess::RedirectErrorStream( + "Redirect Error Stream", + "If true will redirect any error stream output of the process to the output stream.", + "false"); +core::Relationship ExecuteProcess::Success( + "success", "All created FlowFiles are routed to this relationship."); + +void ExecuteProcess::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(Command); + properties.insert(CommandArguments); + properties.insert(WorkingDir); + properties.insert(BatchDuration); + properties.insert(RedirectErrorStream); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void ExecuteProcess::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::string value; + if (context->getProperty(Command.getName(), value)) { + this->_command = value; + } + if (context->getProperty(CommandArguments.getName(), value)) { + this->_commandArgument = value; + } + if (context->getProperty(WorkingDir.getName(), value)) { + this->_workingDir = value; + } + if (context->getProperty(BatchDuration.getName(), value)) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, + _batchDuration, + unit) + && core::Property::ConvertTimeUnitToMS( + _batchDuration, unit, _batchDuration)) { + + } + } + if (context->getProperty(RedirectErrorStream.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool( + value, _redirectErrorStream); + } + this->_fullCommand = _command + " " + _commandArgument; + if (_fullCommand.length() == 0) { + yield(); + return; + } + if (_workingDir.length() > 0 && _workingDir != ".") { + // change to working directory + if (chdir(_workingDir.c_str()) != 0) { + logger_->log_error("Execute Command can not chdir %s", + _workingDir.c_str()); + yield(); + return; + } + } + logger_->log_info("Execute Command %s", _fullCommand.c_str()); + // split the command into array + char cstr[_fullCommand.length() + 1]; + std::strcpy(cstr, _fullCommand.c_str()); + char *p = std::strtok(cstr, " "); + int argc = 0; + char *argv[64]; + while (p != 0 && argc < 64) { + argv[argc] = p; + p = std::strtok(NULL, " "); + argc++; + } + argv[argc] = NULL; + int status, died; + if (!_processRunning) { + _processRunning = true; + // if the process has not launched yet + // create the pipe + if (pipe(_pipefd) == -1) { + _processRunning = false; + yield(); + return; + } + switch (_pid = fork()) { + case -1: + logger_->log_error("Execute Process fork failed"); + _processRunning = false; + close(_pipefd[0]); + close(_pipefd[1]); + yield(); + break; + case 0: // this is the code the child runs + close(1); // close stdout + dup(_pipefd[1]); // points pipefd at file descriptor + if (_redirectErrorStream) + // redirect stderr + dup2(_pipefd[1], 2); + close(_pipefd[0]); + execvp(argv[0], argv); + exit(1); + break; + default: // this is the code the parent runs + // the parent isn't going to write to the pipe + close(_pipefd[1]); + if (_batchDuration > 0) { + while (1) { + std::this_thread::sleep_for( + std::chrono::milliseconds(_batchDuration)); + char buffer[4096]; + int numRead = read(_pipefd[0], buffer, sizeof(buffer)); + if (numRead <= 0) + break; + logger_->log_info("Execute Command Respond %d", numRead); + ExecuteProcess::WriteCallback callback(buffer, numRead); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< + FlowFileRecord>(session->create()); + if (!flowFile) + continue; + flowFile->addAttribute("command", _command.c_str()); + flowFile->addAttribute("command.arguments", + _commandArgument.c_str()); + session->write(flowFile, &callback); + session->transfer(flowFile, Success); + session->commit(); + } + } else { + char buffer[4096]; + char *bufPtr = buffer; + int totalRead = 0; + std::shared_ptr<FlowFileRecord> flowFile = nullptr; + while (1) { + int numRead = read(_pipefd[0], bufPtr, + (sizeof(buffer) - totalRead)); + if (numRead <= 0) { + if (totalRead > 0) { + logger_->log_info("Execute Command Respond %d", totalRead); + // child exits and close the pipe + ExecuteProcess::WriteCallback callback(buffer, totalRead); + if (!flowFile) { + flowFile = std::static_pointer_cast<FlowFileRecord>( + session->create()); + if (!flowFile) + break; + flowFile->addAttribute("command", _command.c_str()); + flowFile->addAttribute("command.arguments", + _commandArgument.c_str()); + session->write(flowFile, &callback); + } else { + session->append(flowFile, &callback); + } + session->transfer(flowFile, Success); + } + break; + } else { + if (numRead == (sizeof(buffer) - totalRead)) { + // we reach the max buffer size + logger_->log_info("Execute Command Max Respond %d", + sizeof(buffer)); + ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); + if (!flowFile) { + flowFile = std::static_pointer_cast<FlowFileRecord>( + session->create()); + if (!flowFile) + continue; + flowFile->addAttribute("command", _command.c_str()); + flowFile->addAttribute("command.arguments", + _commandArgument.c_str()); + session->write(flowFile, &callback); + } else { + session->append(flowFile, &callback); + } + // Rewind + totalRead = 0; + bufPtr = buffer; + } else { + totalRead += numRead; + bufPtr += numRead; + } + } + } + } + + died = wait(&status); + if (WIFEXITED(status)) { + logger_->log_info("Execute Command Complete %s status %d pid %d", + _fullCommand.c_str(), WEXITSTATUS(status), _pid); + } else { + logger_->log_info("Execute Command Complete %s status %d pid %d", + _fullCommand.c_str(), WTERMSIG(status), _pid); + } + + close(_pipefd[0]); + _processRunning = false; + break; + } + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp new file mode 100644 index 0000000..ebdaaa3 --- /dev/null +++ b/libminifi/src/processors/GenerateFlowFile.cpp @@ -0,0 +1,145 @@ +/** + * @file GenerateFlowFile.cpp + * GenerateFlowFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include "utils/StringUtils.h" + +#include "processors/GenerateFlowFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { +const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary"; +const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text"; +const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile"); +core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB"); +core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1"); +core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY); +core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", + "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true"); +core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record"); + +void GenerateFlowFile::initialize() +{ + // Set the supported properties + std::set<core::Property> properties; + properties.insert(FileSize); + properties.insert(BatchSize); + properties.insert(DataFormat); + properties.insert(UniqueFlowFiles); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) +{ + int64_t batchSize = 1; + bool uniqueFlowFile = true; + int64_t fileSize = 1024; + + std::string value; + if (context->getProperty(FileSize.getName(), value)) + { + core::Property::StringToInt(value, fileSize); + } + if (context->getProperty(BatchSize.getName(), value)) + { + core::Property::StringToInt(value, batchSize); + } + if (context->getProperty(UniqueFlowFiles.getName(), value)) + { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile); + } + + if (!uniqueFlowFile) + { + char *data; + data = new char[fileSize]; + if (!data) + return; + uint64_t dataSize = fileSize; + GenerateFlowFile::WriteCallback callback(data, dataSize); + char *current = data; + for (int i = 0; i < fileSize; i+= sizeof(int)) + { + int randValue = random(); + *((int *) current) = randValue; + current += sizeof(int); + } + for (int i = 0; i < batchSize; i++) + { + // For each batch + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + if (!flowFile) + return; + if (fileSize > 0) + session->write(flowFile, &callback); + session->transfer(flowFile, Success); + } + delete[] data; + } + else + { + if (!_data) + { + // We have not create the unique data yet + _data = new char[fileSize]; + _dataSize = fileSize; + char *current = _data; + for (int i = 0; i < fileSize; i+= sizeof(int)) + { + int randValue = random(); + *((int *) current) = randValue; + // *((int *) current) = (0xFFFFFFFF & i); + current += sizeof(int); + } + } + GenerateFlowFile::WriteCallback callback(_data, _dataSize); + for (int i = 0; i < batchSize; i++) + { + // For each batch + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + if (!flowFile) + return; + if (fileSize > 0) + session->write(flowFile, &callback); + session->transfer(flowFile, Success); + } + } +} +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp new file mode 100644 index 0000000..cf05657 --- /dev/null +++ b/libminifi/src/processors/GetFile.cpp @@ -0,0 +1,340 @@ +/** + * @file GetFile.cpp + * GetFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <time.h> +#include <sstream> +#include <stdio.h> +#include <string> +#include <iostream> +#include <dirent.h> +#include <limits.h> +#include <unistd.h> +#if (__GNUC__ >= 4) +#if (__GNUC_MINOR__ < 9) +#include <regex.h> +#endif +#endif +#include "utils/StringUtils.h" +#include <regex> +#include "utils/TimeUtil.h" +#include "processors/GetFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { +const std::string GetFile::ProcessorName("GetFile"); +core::Property GetFile::BatchSize( + "Batch Size", "The maximum number of files to pull in each iteration", + "10"); +core::Property GetFile::Directory( + "Input Directory", "The input directory from which to pull files", "."); +core::Property GetFile::IgnoreHiddenFile( + "Ignore Hidden Files", + "Indicates whether or not hidden files should be ignored", "true"); +core::Property GetFile::KeepSourceFile( + "Keep Source File", + "If true, the file is not deleted after it has been copied to the Content Repository", + "false"); +core::Property GetFile::MaxAge( + "Maximum File Age", + "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", + "0 sec"); +core::Property GetFile::MinAge( + "Minimum File Age", + "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", + "0 sec"); +core::Property GetFile::MaxSize( + "Maximum File Size", + "The maximum size that a file can be in order to be pulled", "0 B"); +core::Property GetFile::MinSize( + "Minimum File Size", + "The minimum size that a file must be in order to be pulled", "0 B"); +core::Property GetFile::PollInterval( + "Polling Interval", + "Indicates how long to wait before performing a directory listing", + "0 sec"); +core::Property GetFile::Recurse( + "Recurse Subdirectories", + "Indicates whether or not to pull files from subdirectories", "true"); +core::Property GetFile::FileFilter( + "File Filter", + "Only files whose names match the given regular expression will be picked up", + "[^\\.].*"); +core::Relationship GetFile::Success( + "success", "All files are routed to success"); + +void GetFile::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(BatchSize); + properties.insert(Directory); + properties.insert(IgnoreHiddenFile); + properties.insert(KeepSourceFile); + properties.insert(MaxAge); + properties.insert(MinAge); + properties.insert(MaxSize); + properties.insert(MinSize); + properties.insert(PollInterval); + properties.insert(Recurse); + properties.insert(FileFilter); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void GetFile::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::string value; + + logger_->log_info("onTrigger GetFile"); + if (context->getProperty(Directory.getName(), value)) { + _directory = value; + } + if (context->getProperty(BatchSize.getName(), value)) { + core::Property::StringToInt(value, _batchSize); + } + if (context->getProperty(IgnoreHiddenFile.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool( + value, _ignoreHiddenFile); + } + if (context->getProperty(KeepSourceFile.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool( + value, _keepSourceFile); + } + + logger_->log_info("onTrigger GetFile"); + if (context->getProperty(MaxAge.getName(), value)) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, _maxAge, + unit) + && core::Property::ConvertTimeUnitToMS( + _maxAge, unit, _maxAge)) { + + } + } + if (context->getProperty(MinAge.getName(), value)) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, _minAge, + unit) + && core::Property::ConvertTimeUnitToMS( + _minAge, unit, _minAge)) { + + } + } + if (context->getProperty(MaxSize.getName(), value)) { + core::Property::StringToInt(value, _maxSize); + } + if (context->getProperty(MinSize.getName(), value)) { + core::Property::StringToInt(value, _minSize); + } + if (context->getProperty(PollInterval.getName(), value)) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, + _pollInterval, + unit) + && core::Property::ConvertTimeUnitToMS( + _pollInterval, unit, _pollInterval)) { + + } + } + if (context->getProperty(Recurse.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, + _recursive); + } + + if (context->getProperty(FileFilter.getName(), value)) { + _fileFilter = value; + } + + // Perform directory list + logger_->log_info("Is listing empty %i", isListingEmpty()); + if (isListingEmpty()) { + if (_pollInterval == 0 + || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) { + performListing(_directory); + } + } + logger_->log_info("Is listing empty %i", isListingEmpty()); + + if (!isListingEmpty()) { + try { + std::queue<std::string> list; + pollListing(list, _batchSize); + while (!list.empty()) { + + std::string fileName = list.front(); + list.pop(); + logger_->log_info("GetFile process %s", fileName.c_str()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< + FlowFileRecord>(session->create()); + if (flowFile == nullptr) + return; + std::size_t found = fileName.find_last_of("/\\"); + std::string path = fileName.substr(0, found); + std::string name = fileName.substr(found + 1); + flowFile->updateKeyedAttribute(FILENAME, name); + flowFile->updateKeyedAttribute(PATH, path); + flowFile->addKeyedAttribute(ABSOLUTE_PATH, fileName); + session->import(fileName, flowFile, _keepSourceFile); + session->transfer(flowFile, Success); + } + } catch (std::exception &exception) { + logger_->log_debug("GetFile Caught Exception %s", exception.what()); + throw; + } catch (...) { + throw; + } + } + +} + +bool GetFile::isListingEmpty() { + std::lock_guard<std::mutex> lock(mutex_); + + return _dirList.empty(); +} + +void GetFile::putListing(std::string fileName) { + std::lock_guard<std::mutex> lock(mutex_); + + _dirList.push(fileName); +} + +void GetFile::pollListing(std::queue<std::string> &list, int maxSize) { + std::lock_guard<std::mutex> lock(mutex_); + + while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize)) { + std::string fileName = _dirList.front(); + _dirList.pop(); + list.push(fileName); + } + + return; +} + +bool GetFile::acceptFile(std::string fullName, std::string name) { + struct stat statbuf; + + if (stat(fullName.c_str(), &statbuf) == 0) { + if (_minSize > 0 && statbuf.st_size < _minSize) + return false; + + if (_maxSize > 0 && statbuf.st_size > _maxSize) + return false; + + uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); + uint64_t fileAge = getTimeMillis() - modifiedTime; + if (_minAge > 0 && fileAge < _minAge) + return false; + if (_maxAge > 0 && fileAge > _maxAge) + return false; + + if (_ignoreHiddenFile && fullName.c_str()[0] == '.') + return false; + + if (access(fullName.c_str(), R_OK) != 0) + return false; + + if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0) + return false; + +#ifdef __GNUC__ +#if (__GNUC__ >= 4) +#if (__GNUC_MINOR__ < 9) + regex_t regex; + int ret = regcomp(®ex, _fileFilter.c_str(), 0); + if (ret) + return false; + ret = regexec(®ex, name.c_str(), (size_t) 0, NULL, 0); + regfree(®ex); + if (ret) + return false; +#else + try { + std::regex re(_fileFilter); + + if (!std::regex_match(name, re)) { + return false; + } + } catch (std::regex_error e) { + logger_->log_error("Invalid File Filter regex: %s.", e.what()); + return false; + } +#endif +#endif +#else + logger_->log_info("Cannot support regex filtering"); +#endif + return true; + } + + return false; +} + +void GetFile::performListing(std::string dir) { + logger_->log_info("Performing file listing against %s", dir.c_str()); + DIR *d; + d = opendir(dir.c_str()); + if (!d) + return; + // only perform a listing while we are not empty + logger_->log_info("Performing file listing against %s", dir.c_str()); + while (isRunning()) { + struct dirent *entry; + entry = readdir(d); + if (!entry) + break; + std::string d_name = entry->d_name; + if ((entry->d_type & DT_DIR)) { + // if this is a directory + if (_recursive && strcmp(d_name.c_str(), "..") != 0 + && strcmp(d_name.c_str(), ".") != 0) { + std::string path = dir + "/" + d_name; + performListing(path); + } + } else { + std::string fileName = dir + "/" + d_name; + if (acceptFile(fileName, d_name)) { + // check whether we can take this file + putListing(fileName); + } + } + } + closedir(d); +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp new file mode 100644 index 0000000..36c743e --- /dev/null +++ b/libminifi/src/processors/ListenHTTP.cpp @@ -0,0 +1,380 @@ +/** + * @file ListenHTTP.cpp + * ListenHTTP class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sstream> +#include <stdio.h> +#include <string> +#include <iostream> +#include <fstream> +#include <uuid/uuid.h> + +#include <CivetServer.h> + +#include "processors/ListenHTTP.h" + +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +const std::string ListenHTTP::ProcessorName("ListenHTTP"); + +core::Property ListenHTTP::BasePath( + "Base Path", "Base path for incoming connections", "contentListener"); +core::Property ListenHTTP::Port( + "Listening Port", "The Port to listen on for incoming connections", ""); +core::Property ListenHTTP::AuthorizedDNPattern( + "Authorized DN Pattern", + "A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.", + ".*"); +core::Property ListenHTTP::SSLCertificate( + "SSL Certificate", + "File containing PEM-formatted file including TLS/SSL certificate and key", + ""); +core::Property ListenHTTP::SSLCertificateAuthority( + "SSL Certificate Authority", + "File containing trusted PEM-formatted certificates", ""); +core::Property ListenHTTP::SSLVerifyPeer( + "SSL Verify Peer", + "Whether or not to verify the client's certificate (yes/no)", "no"); +core::Property ListenHTTP::SSLMinimumVersion( + "SSL Minimum Version", + "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", + "SSL2"); +core::Property ListenHTTP::HeadersAsAttributesRegex( + "HTTP Headers to receive as Attributes (Regex)", + "Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes", + ""); + +core::Relationship ListenHTTP::Success( + "success", "All files are routed to success"); + +void ListenHTTP::initialize() { + _logger->log_info("Initializing ListenHTTP"); + + // Set the supported properties + std::set < core::Property > properties; + properties.insert(BasePath); + properties.insert(Port); + properties.insert(AuthorizedDNPattern); + properties.insert(SSLCertificate); + properties.insert(SSLCertificateAuthority); + properties.insert(SSLVerifyPeer); + properties.insert(SSLMinimumVersion); + properties.insert(HeadersAsAttributesRegex); + setSupportedProperties (properties); + // Set the supported relationships + std::set < core::Relationship > relationships; + relationships.insert(Success); + setSupportedRelationships (relationships); +} + +void ListenHTTP::onSchedule( + core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory) { + + std::string basePath; + + if (!context->getProperty(BasePath.getName(), basePath)) { + _logger->log_info( + "%s attribute is missing, so default value of %s will be used", + BasePath.getName().c_str(), BasePath.getValue().c_str()); + basePath = BasePath.getValue(); + } + + basePath.insert(0, "/"); + + std::string listeningPort; + + if (!context->getProperty(Port.getName(), listeningPort)) { + _logger->log_error("%s attribute is missing or invalid", + Port.getName().c_str()); + return; + } + + std::string authDNPattern; + + if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) + && !authDNPattern.empty()) { + _logger->log_info("ListenHTTP using %s: %s", + AuthorizedDNPattern.getName().c_str(), + authDNPattern.c_str()); + } + + std::string sslCertFile; + + if (context->getProperty(SSLCertificate.getName(), sslCertFile) + && !sslCertFile.empty()) { + _logger->log_info("ListenHTTP using %s: %s", + SSLCertificate.getName().c_str(), sslCertFile.c_str()); + } + + // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set + std::string sslCertAuthorityFile; + std::string sslVerifyPeer; + std::string sslMinVer; + + if (!sslCertFile.empty()) { + if (context->getProperty(SSLCertificateAuthority.getName(), + sslCertAuthorityFile) + && !sslCertAuthorityFile.empty()) { + _logger->log_info("ListenHTTP using %s: %s", + SSLCertificateAuthority.getName().c_str(), + sslCertAuthorityFile.c_str()); + } + + if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) { + if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { + _logger->log_info("ListenHTTP will not verify peers"); + } else { + _logger->log_info("ListenHTTP will verify peers"); + } + } else { + _logger->log_info("ListenHTTP will not verify peers"); + } + + if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) { + _logger->log_info("ListenHTTP using %s: %s", + SSLMinimumVersion.getName().c_str(), sslMinVer.c_str()); + } + } + + std::string headersAsAttributesPattern; + + if (context->getProperty(HeadersAsAttributesRegex.getName(), + headersAsAttributesPattern) + && !headersAsAttributesPattern.empty()) { + _logger->log_info("ListenHTTP using %s: %s", + HeadersAsAttributesRegex.getName().c_str(), + headersAsAttributesPattern.c_str()); + } + + auto numThreads = getMaxConcurrentTasks(); + + _logger->log_info( + "ListenHTTP starting HTTP server on port %s and path %s with %d threads", + listeningPort.c_str(), basePath.c_str(), numThreads); + + // Initialize web server + std::vector < std::string > options; + options.push_back("enable_keep_alive"); + options.push_back("yes"); + options.push_back("keep_alive_timeout_ms"); + options.push_back("15000"); + options.push_back("num_threads"); + options.push_back(std::to_string(numThreads)); + + if (sslCertFile.empty()) { + options.push_back("listening_ports"); + options.push_back(listeningPort); + } else { + listeningPort += "s"; + options.push_back("listening_ports"); + options.push_back(listeningPort); + + options.push_back("ssl_certificate"); + options.push_back(sslCertFile); + + if (!sslCertAuthorityFile.empty()) { + options.push_back("ssl_ca_file"); + options.push_back(sslCertAuthorityFile); + } + + if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { + options.push_back("ssl_verify_peer"); + options.push_back("no"); + } else { + options.push_back("ssl_verify_peer"); + options.push_back("yes"); + } + + if (sslMinVer.compare("SSL2") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(0)); + } else if (sslMinVer.compare("SSL3") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(1)); + } else if (sslMinVer.compare("TLS1.0") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(2)); + } else if (sslMinVer.compare("TLS1.1") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(3)); + } else { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(4)); + } + } + + _server.reset(new CivetServer(options)); + _handler.reset( + new Handler(context, sessionFactory, std::move(authDNPattern), + std::move(headersAsAttributesPattern))); + _server->addHandler(basePath, _handler.get()); +} + +ListenHTTP::~ListenHTTP() { +} + +void ListenHTTP::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + + std::shared_ptr < FlowFileRecord > flowFile = std::static_pointer_cast + < FlowFileRecord > (session->get()); + + // Do nothing if there are no incoming files + if (!flowFile) { + return; + } +} + +ListenHTTP::Handler::Handler( + core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory, + std::string &&authDNPattern, std::string &&headersAsAttributesPattern) + : _authDNRegex(std::move(authDNPattern)), + _headersAsAttributesRegex(std::move(headersAsAttributesPattern)) { + _processContext = context; + _processSessionFactory = sessionFactory; +} + +void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); +} + +bool ListenHTTP::Handler::handlePost(CivetServer *server, + struct mg_connection *conn) { + _logger = logging::Logger::getLogger(); + + auto req_info = mg_get_request_info(conn); + _logger->log_info("ListenHTTP handling POST request of length %d", + req_info->content_length); + + // If this is a two-way TLS connection, authorize the peer against the configured pattern + if (req_info->is_ssl && req_info->client_cert != nullptr) { + if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) { + mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); + _logger->log_warn("ListenHTTP client DN not authorized: %s", + req_info->client_cert->subject); + return true; + } + } + + // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) + mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); + + auto session = _processSessionFactory->createSession(); + ListenHTTP::WriteCallback callback(conn, req_info); + auto flowFile = std::static_pointer_cast < FlowFileRecord + > (session->create()); + + if (!flowFile) { + sendErrorResponse(conn); + return true; + } + + try { + session->write(flowFile, &callback); + + // Add filename from "filename" header value (and pattern headers) + for (int i = 0; i < req_info->num_headers; i++) { + auto header = &req_info->http_headers[i]; + + if (strcmp("filename", header->name) == 0) { + if (!flowFile->updateAttribute("filename", header->value)) { + flowFile->addAttribute("filename", header->value); + } + } else if (std::regex_match(header->name, _headersAsAttributesRegex)) { + if (!flowFile->updateAttribute(header->name, header->value)) { + flowFile->addAttribute(header->name, header->value); + } + } + } + + session->transfer(flowFile, Success); + session->commit(); + } catch (std::exception &exception) { + _logger->log_debug("ListenHTTP Caught Exception %s", exception.what()); + sendErrorResponse(conn); + session->rollback(); + throw; + } catch (...) { + _logger->log_debug("ListenHTTP Caught Exception Processor::onTrigger"); + sendErrorResponse(conn); + session->rollback(); + throw; + } + + mg_printf(conn, "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); + + return true; +} + +ListenHTTP::WriteCallback::WriteCallback( + struct mg_connection *conn, const struct mg_request_info *reqInfo) { + _logger = logging::Logger::getLogger(); + _conn = conn; + _reqInfo = reqInfo; +} + +void ListenHTTP::WriteCallback::process(std::ofstream *stream) { + long long rlen; + long long nlen = 0; + long long tlen = _reqInfo->content_length; + char buf[16384]; + + while (nlen < tlen) { + rlen = tlen - nlen; + + if (rlen > sizeof(buf)) { + rlen = sizeof(buf); + } + + // Read a buffer of data from client + rlen = mg_read(_conn, &buf[0], (size_t) rlen); + + if (rlen <= 0) { + break; + } + + // Transfer buffer data to the output stream + stream->write(&buf[0], rlen); + + nlen += rlen; + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp new file mode 100644 index 0000000..2dd223c --- /dev/null +++ b/libminifi/src/processors/ListenSyslog.cpp @@ -0,0 +1,331 @@ +/** + * @file ListenSyslog.cpp + * ListenSyslog class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <queue> +#include <stdio.h> +#include <string> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "processors/ListenSyslog.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +const std::string ListenSyslog::ProcessorName("ListenSyslog"); +core::Property ListenSyslog::RecvBufSize( + "Receive Buffer Size", + "The size of each buffer used to receive Syslog messages.", "65507 B"); +core::Property ListenSyslog::MaxSocketBufSize( + "Max Size of Socket Buffer", + "The maximum size of the socket buffer that should be used.", "1 MB"); +core::Property ListenSyslog::MaxConnections( + "Max Number of TCP Connections", + "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", + "2"); +core::Property ListenSyslog::MaxBatchSize( + "Max Batch Size", + "The maximum number of Syslog events to add to a single FlowFile.", "1"); +core::Property ListenSyslog::MessageDelimiter( + "Message Delimiter", + "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> core::Property).", + "\n"); +core::Property ListenSyslog::ParseMessages( + "Parse Messages", + "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", + "false"); +core::Property ListenSyslog::Protocol( + "Protocol", "The protocol for Syslog communication.", "UDP"); +core::Property ListenSyslog::Port( + "Port", "The port for Syslog communication.", "514"); +core::Relationship ListenSyslog::Success( + "success", "All files are routed to success"); +core::Relationship ListenSyslog::Invalid( + "invalid", "SysLog message format invalid"); + +void ListenSyslog::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(RecvBufSize); + properties.insert(MaxSocketBufSize); + properties.insert(MaxConnections); + properties.insert(MaxBatchSize); + properties.insert(MessageDelimiter); + properties.insert(ParseMessages); + properties.insert(Protocol); + properties.insert(Port); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + relationships.insert(Invalid); + setSupportedRelationships(relationships); +} + +void ListenSyslog::startSocketThread() { + if (_thread != NULL) + return; + + logger_->log_info("ListenSysLog Socket Thread Start"); + _serverTheadRunning = true; + _thread = new std::thread(run, this); + _thread->detach(); +} + +void ListenSyslog::run(ListenSyslog *process) { + process->runThread(); +} + +void ListenSyslog::runThread() { + while (_serverTheadRunning) { + if (_resetServerSocket) { + _resetServerSocket = false; + // need to reset the socket + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { + int clientSocket = *it; + close(clientSocket); + } + _clientSockets.clear(); + if (_serverSocket > 0) { + close(_serverSocket); + _serverSocket = 0; + } + } + + if (_serverSocket <= 0) { + uint16_t portno = _port; + struct sockaddr_in serv_addr; + int sockfd; + if (_protocol == "TCP") + sockfd = socket(AF_INET, SOCK_STREAM, 0); + else + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd < 0) { + logger_->log_info("ListenSysLog Server socket creation failed"); + break; + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(portno); + if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + logger_->log_error("ListenSysLog Server socket bind failed"); + break; + } + if (_protocol == "TCP") + listen(sockfd, 5); + _serverSocket = sockfd; + logger_->log_error("ListenSysLog Server socket %d bind OK to port %d", + _serverSocket, portno); + } + FD_ZERO(&_readfds); + FD_SET(_serverSocket, &_readfds); + _maxFds = _serverSocket; + std::vector<int>::iterator it; + for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { + int clientSocket = *it; + if (clientSocket >= _maxFds) + _maxFds = clientSocket; + FD_SET(clientSocket, &_readfds); + } + fd_set fds; + struct timeval tv; + int retval; + fds = _readfds; + tv.tv_sec = 0; + // 100 msec + tv.tv_usec = 100000; + retval = select(_maxFds + 1, &fds, NULL, NULL, &tv); + if (retval < 0) + break; + if (retval == 0) + continue; + if (FD_ISSET(_serverSocket, &fds)) { + // server socket, either we have UDP datagram or TCP connection request + if (_protocol == "TCP") { + socklen_t clilen; + struct sockaddr_in cli_addr; + clilen = sizeof(cli_addr); + int newsockfd = accept(_serverSocket, (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd > 0) { + if (_clientSockets.size() < _maxConnections) { + _clientSockets.push_back(newsockfd); + logger_->log_info("ListenSysLog new client socket %d connection", + newsockfd); + continue; + } else { + close(newsockfd); + } + } + } else { + socklen_t clilen; + struct sockaddr_in cli_addr; + clilen = sizeof(cli_addr); + int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, + (struct sockaddr *) &cli_addr, &clilen); + if (recvlen > 0 + && (recvlen + getEventQueueByteSize()) <= _recvBufSize) { + uint8_t *payload = new uint8_t[recvlen]; + memcpy(payload, _buffer, recvlen); + putEvent(payload, recvlen); + } + } + } + it = _clientSockets.begin(); + while (it != _clientSockets.end()) { + int clientSocket = *it; + if (FD_ISSET(clientSocket, &fds)) { + int recvlen = readline(clientSocket, (char *) _buffer, sizeof(_buffer)); + if (recvlen <= 0) { + close(clientSocket); + logger_->log_info("ListenSysLog client socket %d close", + clientSocket); + it = _clientSockets.erase(it); + } else { + if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) { + uint8_t *payload = new uint8_t[recvlen]; + memcpy(payload, _buffer, recvlen); + putEvent(payload, recvlen); + } + ++it; + } + } + } + } + return; +} + +int ListenSyslog::readline(int fd, char *bufptr, size_t len) { + char *bufx = bufptr; + static char *bp; + static int cnt = 0; + static char b[2048]; + char c; + + while (--len > 0) { + if (--cnt <= 0) { + cnt = recv(fd, b, sizeof(b), 0); + if (cnt < 0) { + if ( errno == EINTR) { + len++; /* the while will decrement */ + continue; + } + return -1; + } + if (cnt == 0) + return 0; + bp = b; + } + c = *bp++; + *bufptr++ = c; + if (c == '\n') { + *bufptr = '\n'; + return bufptr - bufx + 1; + } + } + return -1; +} + +void ListenSyslog::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::string value; + bool needResetServerSocket = false; + if (context->getProperty(Protocol.getName(), value)) { + if (_protocol != value) + needResetServerSocket = true; + _protocol = value; + } + if (context->getProperty(RecvBufSize.getName(), value)) { + core::Property::StringToInt(value, _recvBufSize); + } + if (context->getProperty(MaxSocketBufSize.getName(), value)) { + core::Property::StringToInt(value, + _maxSocketBufSize); + } + if (context->getProperty(MaxConnections.getName(), value)) { + core::Property::StringToInt(value, + _maxConnections); + } + if (context->getProperty(MessageDelimiter.getName(), value)) { + _messageDelimiter = value; + } + if (context->getProperty(ParseMessages.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, + _parseMessages); + } + if (context->getProperty(Port.getName(), value)) { + int64_t oldPort = _port; + core::Property::StringToInt(value, _port); + if (_port != oldPort) + needResetServerSocket = true; + } + if (context->getProperty(MaxBatchSize.getName(), value)) { + core::Property::StringToInt(value, + _maxBatchSize); + } + + if (needResetServerSocket) + _resetServerSocket = true; + + startSocketThread(); + + // read from the event queue + if (isEventQueueEmpty()) { + context->yield(); + return; + } + + std::queue<SysLogEvent> eventQueue; + pollEvent(eventQueue, _maxBatchSize); + bool firstEvent = true; + std::shared_ptr<FlowFileRecord> flowFile = NULL; + while (!eventQueue.empty()) { + SysLogEvent event = eventQueue.front(); + eventQueue.pop(); + if (firstEvent) { + flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + if (!flowFile) + return; + ListenSyslog::WriteCallback callback((char *) event.payload, event.len); + session->write(flowFile, &callback); + delete[] event.payload; + firstEvent = false; + } else { + ListenSyslog::WriteCallback callback((char *) event.payload, event.len); + session->append(flowFile, &callback); + delete[] event.payload; + } + } + flowFile->addAttribute("syslog.protocol", _protocol); + flowFile->addAttribute("syslog.port", std::to_string(_port)); + session->transfer(flowFile, Success); +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp new file mode 100644 index 0000000..e2cf16c --- /dev/null +++ b/libminifi/src/processors/LogAttribute.cpp @@ -0,0 +1,176 @@ +/** + * @file LogAttribute.cpp + * LogAttribute class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <sstream> +#include <string.h> +#include <iostream> + +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "processors/LogAttribute.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { +const std::string LogAttribute::ProcessorName("LogAttribute"); +core::Property LogAttribute::LogLevel( + "Log Level", "The Log Level to use when logging the Attributes", "info"); +core::Property LogAttribute::AttributesToLog( + "Attributes to Log", + "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", + ""); +core::Property LogAttribute::AttributesToIgnore( + "Attributes to Ignore", + "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", + ""); +core::Property LogAttribute::LogPayload( + "Log Payload", + "If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", + "false"); +core::Property LogAttribute::LogPrefix( + "Log prefix", + "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", + ""); +core::Relationship LogAttribute::Success( + "success", "success operational on the flow record"); + +void LogAttribute::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(LogLevel); + properties.insert(AttributesToLog); + properties.insert(AttributesToIgnore); + properties.insert(LogPayload); + properties.insert(LogPrefix); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void LogAttribute::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::string dashLine = "--------------------------------------------------"; + LogAttrLevel level = LogAttrLevelInfo; + bool logPayload = false; + std::ostringstream message; + + std::shared_ptr<core::FlowFile> flow = + session->get(); + + if (!flow) + return; + + std::string value; + if (context->getProperty(LogLevel.getName(), value)) { + logLevelStringToEnum(value, level); + } + if (context->getProperty(LogPrefix.getName(), value)) { + dashLine = "-----" + value + "-----"; + } + if (context->getProperty(LogPayload.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, logPayload); + } + + message << "Logging for flow file " << "\n"; + message << dashLine; + message << "\nStandard FlowFile Attributes"; + message << "\n" << "UUID:" << flow->getUUIDStr(); + message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate()); + message << "\n" << "lineageStartDate:" + << getTimeStr(flow->getlineageStartDate()); + message << "\n" << "Size:" << flow->getSize() << " Offset:" + << flow->getOffset(); + message << "\nFlowFile Attributes Map Content"; + std::map<std::string, std::string> attrs = flow->getAttributes(); + std::map<std::string, std::string>::iterator it; + for (it = attrs.begin(); it != attrs.end(); it++) { + message << "\n" << "key:" << it->first << " value:" << it->second; + } + message << "\nFlowFile Resource Claim Content"; + std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim(); + if (claim) { + message << "\n" << "Content Claim:" << claim->getContentFullPath(); + } + if (logPayload && flow->getSize() <= 1024 * 1024) { + message << "\n" << "Payload:" << "\n"; + ReadCallback callback(flow->getSize()); + session->read(flow, &callback); + for (unsigned int i = 0, j = 0; i < callback._readSize; i++) { + char temp[8]; + sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i])); + message << temp; + j++; + if (j == 16) { + message << '\n'; + j = 0; + } + } + } + message << "\n" << dashLine << std::ends; + std::string output = message.str(); + + switch (level) { + case LogAttrLevelInfo: + logger_->log_info("%s", output.c_str()); + break; + case LogAttrLevelDebug: + logger_->log_debug("%s", output.c_str()); + break; + case LogAttrLevelError: + logger_->log_error("%s", output.c_str()); + break; + case LogAttrLevelTrace: + logger_->log_trace("%s", output.c_str()); + break; + case LogAttrLevelWarn: + logger_->log_warn("%s", output.c_str()); + break; + default: + break; + } + + // Test Import + /* + std::shared_ptr<FlowFileRecord> importRecord = session->create(); + session->import(claim->getContentFullPath(), importRecord); + session->transfer(importRecord, Success); */ + + // Transfer to the relationship + session->transfer(flow, Success); +} + + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp new file mode 100644 index 0000000..85cf09b --- /dev/null +++ b/libminifi/src/processors/PutFile.cpp @@ -0,0 +1,213 @@ +/** + * @file PutFile.cpp + * PutFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sstream> +#include <stdio.h> +#include <string> +#include <iostream> +#include <fstream> +#include <uuid/uuid.h> + +#include "utils/StringUtils.h" +#include "utils/TimeUtil.h" +#include "processors/PutFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace"); +const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore"); +const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail"); + +const std::string PutFile::ProcessorName("PutFile"); + +core::Property PutFile::Directory( + "Output Directory", "The output directory to which to put files", "."); +core::Property PutFile::ConflictResolution( + "Conflict Resolution Strategy", + "Indicates what should happen when a file with the same name already exists in the output directory", + CONFLICT_RESOLUTION_STRATEGY_FAIL); + +core::Relationship PutFile::Success( + "success", "All files are routed to success"); +core::Relationship PutFile::Failure( + "failure", + "Failed files (conflict, write failure, etc.) are transferred to failure"); + +void PutFile::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(Directory); + properties.insert(ConflictResolution); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void PutFile::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::string directory; + + if (!context->getProperty(Directory.getName(), directory)) { + logger_->log_error("Directory attribute is missing or invalid"); + return; + } + + std::string conflictResolution; + + if (!context->getProperty(ConflictResolution.getName(), conflictResolution)) { + logger_->log_error( + "Conflict Resolution Strategy attribute is missing or invalid"); + return; + } + + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); + + // Do nothing if there are no incoming files + if (!flowFile) { + return; + } + + std::string filename; + flowFile->getKeyedAttribute(FILENAME, filename); + + // Generate a safe (universally-unique) temporary filename on the same partition + char tmpFileUuidStr[37]; + uuid_t tmpFileUuid; + uuid_generate(tmpFileUuid); + uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr); + std::stringstream tmpFileSs; + tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr; + std::string tmpFile = tmpFileSs.str(); + logger_->log_info("PutFile using temporary file %s", tmpFile.c_str()); + + // Determine dest full file paths + std::stringstream destFileSs; + destFileSs << directory << "/" << filename; + std::string destFile = destFileSs.str(); + + logger_->log_info("PutFile writing file %s into directory %s", + filename.c_str(), directory.c_str()); + + // If file exists, apply conflict resolution strategy + struct stat statResult; + + if (stat(destFile.c_str(), &statResult) == 0) { + logger_->log_info( + "Destination file %s exists; applying Conflict Resolution Strategy: %s", + destFile.c_str(), conflictResolution.c_str()); + + if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE) { + putFile(session, flowFile, tmpFile, destFile); + } else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE) { + session->transfer(flowFile, Success); + } else { + session->transfer(flowFile, Failure); + } + } else { + putFile(session, flowFile, tmpFile, destFile); + } +} + +bool PutFile::putFile(core::ProcessSession *session, + std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, + const std::string &destFile) { + + ReadCallback cb(tmpFile, destFile); + session->read(flowFile, &cb); + + if (cb.commit()) { + session->transfer(flowFile, Success); + return true; + } else { + session->transfer(flowFile, Failure); + } + return false; +} + +PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, + const std::string &destFile) + : _tmpFile(tmpFile), + _tmpFileOs(tmpFile), + _destFile(destFile) { + logger_ = logging::Logger::getLogger(); +} + +// Copy the entire file contents to the temporary file +void PutFile::ReadCallback::process(std::ifstream *stream) { + // Copy file contents into tmp file + _writeSucceeded = false; + _tmpFileOs << stream->rdbuf(); + _writeSucceeded = true; +} + +// Renames tmp file to final destination +// Returns true if commit succeeded +bool PutFile::ReadCallback::commit() { + bool success = false; + + logger_->log_info("PutFile committing put file operation to %s", + _destFile.c_str()); + + if (_writeSucceeded) { + _tmpFileOs.close(); + + if (rename(_tmpFile.c_str(), _destFile.c_str())) { + logger_->log_info( + "PutFile commit put file operation to %s failed because rename() call failed", + _destFile.c_str()); + } else { + success = true; + logger_->log_info("PutFile commit put file operation to %s succeeded", + _destFile.c_str()); + } + } else { + logger_->log_error( + "PutFile commit put file operation to %s failed because write failed", + _destFile.c_str()); + } + + return success; +} + +// Clean up resources +PutFile::ReadCallback::~ReadCallback() { + // Close tmp file + _tmpFileOs.close(); + + // Clean up tmp file, if necessary + unlink(_tmpFile.c_str()); +} + + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/RealTimeDataCollector.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/RealTimeDataCollector.cpp b/libminifi/src/processors/RealTimeDataCollector.cpp new file mode 100644 index 0000000..922835d --- /dev/null +++ b/libminifi/src/processors/RealTimeDataCollector.cpp @@ -0,0 +1,480 @@ +/** + * @file RealTimeDataCollector.cpp + * RealTimeDataCollector class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <memory> +#include <random> +#include <netinet/tcp.h> + +#include "utils/StringUtils.h" +#include "processors/RealTimeDataCollector.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { +const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector"); +core::Property RealTimeDataCollector::FILENAME( + "File Name", "File Name for the real time processor to process", + "data.osp"); +core::Property RealTimeDataCollector::REALTIMESERVERNAME( + "Real Time Server Name", "Real Time Server Name", "localhost"); +core::Property RealTimeDataCollector::REALTIMESERVERPORT( + "Real Time Server Port", "Real Time Server Port", "10000"); +core::Property RealTimeDataCollector::BATCHSERVERNAME( + "Batch Server Name", "Batch Server Name", "localhost"); +core::Property RealTimeDataCollector::BATCHSERVERPORT( + "Batch Server Port", "Batch Server Port", "10001"); +core::Property RealTimeDataCollector::ITERATION( + "Iteration", "If true, sample osp file will be iterated", "true"); +core::Property RealTimeDataCollector::REALTIMEMSGID( + "Real Time Message ID", "Real Time Message ID", "41"); +core::Property RealTimeDataCollector::BATCHMSGID( + "Batch Message ID", "Batch Message ID", "172, 30, 48"); +core::Property RealTimeDataCollector::REALTIMEINTERVAL( + "Real Time Interval", "Real Time Data Collection Interval in msec", + "10 ms"); +core::Property RealTimeDataCollector::BATCHINTERVAL( + "Batch Time Interval", "Batch Processing Interval in msec", "100 ms"); +core::Property RealTimeDataCollector::BATCHMAXBUFFERSIZE( + "Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144"); +core::Relationship RealTimeDataCollector::Success( + "success", "success operational on the flow record"); + +void RealTimeDataCollector::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(FILENAME); + properties.insert(REALTIMESERVERNAME); + properties.insert(REALTIMESERVERPORT); + properties.insert(BATCHSERVERNAME); + properties.insert(BATCHSERVERPORT); + properties.insert(ITERATION); + properties.insert(REALTIMEMSGID); + properties.insert(BATCHMSGID); + properties.insert(REALTIMEINTERVAL); + properties.insert(BATCHINTERVAL); + properties.insert(BATCHMAXBUFFERSIZE); + + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); + +} + +int RealTimeDataCollector::connectServer(const char *host, uint16_t port) { + in_addr_t addr; + int sock = 0; + struct hostent *h; +#ifdef __MACH__ + h = gethostbyname(host); +#else + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + logger_->log_error("Could not create socket to hostName %s", host); + return 0; + } + +#ifndef __MACH__ + int opt = 1; + bool nagle_off = true; + + if (nagle_off) + { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) + { + logger_->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return 0; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&opt, sizeof(opt)) < 0) + { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return 0; + } + } + + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) + { + logger_->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return 0; + } +#endif + + struct sockaddr_in sa; + socklen_t socklen; + int status; + + //TODO bind socket to the interface + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_ANY); + sa.sin_port = htons(0); + socklen = sizeof(sa); + if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) { + logger_->log_error("socket bind failed"); + close(sock); + return 0; + } + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = addr; + sa.sin_port = htons(port); + socklen = sizeof(sa); + + status = connect(sock, (struct sockaddr *) &sa, socklen); + + if (status < 0) { + logger_->log_error("socket connect failed to %s %d", host, port); + close(sock); + return 0; + } + + logger_->log_info("socket %d connect to server %s port %d success", sock, + host, port); + + return sock; +} + +int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) { + int ret = 0, bytes = 0; + + while (bytes < buflen) { + ret = send(socket, buf + bytes, buflen - bytes, 0); + //check for errors + if (ret == -1) { + return ret; + } + bytes += ret; + } + + if (ret) + logger_->log_debug("Send data size %d over socket %d", buflen, socket); + + return ret; +} + +void RealTimeDataCollector::onTriggerRealTime( + core::ProcessContext *context, + core::ProcessSession *session) { + if (_realTimeAccumulated >= this->_realTimeInterval) { + std::string value; + if (this->getProperty(REALTIMEMSGID.getName(), value)) { + this->_realTimeMsgID.clear(); + this->logger_->log_info("Real Time Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while (std::getline(lineStream, cell, ',')) { + this->_realTimeMsgID.push_back(cell); + // this->logger_->log_debug("Real Time Msg ID %s", cell.c_str()); + } + } + if (this->getProperty(BATCHMSGID.getName(), value)) { + this->_batchMsgID.clear(); + this->logger_->log_info("Batch Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while (std::getline(lineStream, cell, ',')) { + cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell); + this->_batchMsgID.push_back(cell); + } + } + // Open the file + if (!this->_fileStream.is_open()) { + _fileStream.open(this->_fileName.c_str(), std::ifstream::in); + if (this->_fileStream.is_open()) + logger_->log_debug("open %s", _fileName.c_str()); + } + if (!_fileStream.good()) { + logger_->log_error("load data file failed %s", _fileName.c_str()); + return; + } + if (this->_fileStream.is_open()) { + std::string line; + + while (std::getline(_fileStream, line)) { + line += "\n"; + std::stringstream lineStream(line); + std::string cell; + if (std::getline(lineStream, cell, ',')) { + cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell); + // Check whether it match to the batch traffic + for (std::vector<std::string>::iterator it = _batchMsgID.begin(); + it != _batchMsgID.end(); ++it) { + if (cell == *it) { + // push the batch data to the queue + std::lock_guard<std::mutex> lock(mutex_); + while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) { + std::string item = _queue.front(); + _queuedDataSize -= item.size(); + logger_->log_debug( + "Pop item size %d from batch queue, queue buffer size %d", + item.size(), _queuedDataSize); + _queue.pop(); + } + _queue.push(line); + _queuedDataSize += line.size(); + logger_->log_debug( + "Push batch msg ID %s into batch queue, queue buffer size %d", + cell.c_str(), _queuedDataSize); + } + } + bool findRealTime = false; + // Check whether it match to the real time traffic + for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); + it != _realTimeMsgID.end(); ++it) { + if (cell == *it) { + int status = 0; + if (this->_realTimeSocket <= 0) { + // Connect the LTE socket + uint16_t port = _realTimeServerPort; + this->_realTimeSocket = connectServer( + _realTimeServerName.c_str(), port); + } + if (this->_realTimeSocket) { + // try to send the data + status = sendData(_realTimeSocket, line.data(), line.size()); + if (status < 0) { + close(_realTimeSocket); + _realTimeSocket = 0; + } + } + if (this->_realTimeSocket <= 0 || status < 0) { + // push the batch data to the queue + std::lock_guard<std::mutex> lock(mutex_); + while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) { + std::string item = _queue.front(); + _queuedDataSize -= item.size(); + logger_->log_debug( + "Pop item size %d from batch queue, queue buffer size %d", + item.size(), _queuedDataSize); + _queue.pop(); + } + _queue.push(line); + _queuedDataSize += line.size(); + logger_->log_debug( + "Push real time msg ID %s into batch queue, queue buffer size %d", + cell.c_str(), _queuedDataSize); + } + // find real time + findRealTime = true; + } // cell + } // for real time pattern + if (findRealTime) + // we break the while once we find the first real time + break; + } // if get line + } // while + if (_fileStream.eof()) { + _fileStream.close(); + } + } // if open + _realTimeAccumulated = 0; + } + std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( + context->getProcessorNode().getProcessor()); + _realTimeAccumulated += processor->getSchedulingPeriodNano(); +} + +void RealTimeDataCollector::onTriggerBatch( + core::ProcessContext *context, + core::ProcessSession *session) { + if (_batchAcccumulated >= this->_batchInterval) { + // logger_->log_info("onTriggerBatch"); + // dequeue the batch and send over WIFI + int status = 0; + if (this->_batchSocket <= 0) { + // Connect the WIFI socket + uint16_t port = _batchServerPort; + this->_batchSocket = connectServer(_batchServerName.c_str(), port); + } + if (this->_batchSocket) { + std::lock_guard<std::mutex> lock(mutex_); + + while (!_queue.empty()) { + std::string line = _queue.front(); + status = sendData(_batchSocket, line.data(), line.size()); + _queue.pop(); + _queuedDataSize -= line.size(); + if (status < 0) { + close(_batchSocket); + _batchSocket = 0; + break; + } + } + } + _batchAcccumulated = 0; + } + std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( + context->getProcessorNode().getProcessor()); + _batchAcccumulated += processor->getSchedulingPeriodNano(); +} + +void RealTimeDataCollector::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::thread::id id = std::this_thread::get_id(); + + if (id == _realTimeThreadId) + return onTriggerRealTime(context, session); + else if (id == _batchThreadId) + return onTriggerBatch(context, session); + else { + std::lock_guard<std::mutex> lock(mutex_); + if (!this->_firstInvoking) { + this->_fileName = "data.osp"; + std::string value; + if (this->getProperty(FILENAME.getName(), value)) { + this->_fileName = value; + this->logger_->log_info("Data Collector File Name %s", + _fileName.c_str()); + } + this->_realTimeServerName = "localhost"; + if (this->getProperty(REALTIMESERVERNAME.getName(), value)) { + this->_realTimeServerName = value; + this->logger_->log_info("Real Time Server Name %s", + this->_realTimeServerName.c_str()); + } + this->_realTimeServerPort = 10000; + if (this->getProperty(REALTIMESERVERPORT.getName(), value)) { + core::Property::StringToInt( + value, _realTimeServerPort); + this->logger_->log_info("Real Time Server Port %d", + _realTimeServerPort); + } + if (this->getProperty(BATCHSERVERNAME.getName(), value)) { + this->_batchServerName = value; + this->logger_->log_info("Batch Server Name %s", + this->_batchServerName.c_str()); + } + this->_batchServerPort = 10001; + if (this->getProperty(BATCHSERVERPORT.getName(), value)) { + core::Property::StringToInt( + value, _batchServerPort); + this->logger_->log_info("Batch Server Port %d", _batchServerPort); + } + if (this->getProperty(ITERATION.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, this->_iteration); + logger_->log_info("Iteration %d", _iteration); + } + this->_realTimeInterval = 10000000; //10 msec + if (this->getProperty(REALTIMEINTERVAL.getName(), value)) { + core::TimeUnit unit; + if (core::Property::StringToTime( + value, _realTimeInterval, unit) + && core::Property::ConvertTimeUnitToNS( + _realTimeInterval, unit, _realTimeInterval)) { + logger_->log_info("Real Time Interval: [%d] ns", _realTimeInterval); + } + } + this->_batchInterval = 100000000; //100 msec + if (this->getProperty(BATCHINTERVAL.getName(), value)) { + core::TimeUnit unit; + if (core::Property::StringToTime( + value, _batchInterval, unit) + && core::Property::ConvertTimeUnitToNS( + _batchInterval, unit, _batchInterval)) { + logger_->log_info("Batch Time Interval: [%d] ns", _batchInterval); + } + } + this->_batchMaxBufferSize = 256 * 1024; + if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) { + core::Property::StringToInt( + value, _batchMaxBufferSize); + this->logger_->log_info("Batch Max Buffer Size %d", + _batchMaxBufferSize); + } + if (this->getProperty(REALTIMEMSGID.getName(), value)) { + this->logger_->log_info("Real Time Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while (std::getline(lineStream, cell, ',')) { + this->_realTimeMsgID.push_back(cell); + this->logger_->log_info("Real Time Msg ID %s", cell.c_str()); + } + } + if (this->getProperty(BATCHMSGID.getName(), value)) { + this->logger_->log_info("Batch Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while (std::getline(lineStream, cell, ',')) { + cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell); + this->_batchMsgID.push_back(cell); + this->logger_->log_info("Batch Msg ID %s", cell.c_str()); + } + } + // Connect the LTE socket + uint16_t port = _realTimeServerPort; + + this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); + + // Connect the WIFI socket + port = _batchServerPort; + + this->_batchSocket = connectServer(_batchServerName.c_str(), port); + + // Open the file + _fileStream.open(this->_fileName.c_str(), std::ifstream::in); + if (!_fileStream.good()) { + logger_->log_error("load data file failed %s", _fileName.c_str()); + return; + } else { + logger_->log_debug("open %s", _fileName.c_str()); + } + _realTimeThreadId = id; + this->_firstInvoking = true; + } else { + if (id != _realTimeThreadId) + _batchThreadId = id; + this->_firstInvoking = false; + } + } +} +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
