http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/Repository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp new file mode 100644 index 0000000..9a27785 --- /dev/null +++ b/libminifi/src/core/Repository.cpp @@ -0,0 +1,65 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <cstdint> +#include <vector> +#include <arpa/inet.h> +#include "io/DataStream.h" +#include "io/Serializable.h" +#include "core/Relationship.h" +#include "core/logging/Logger.h" +#include "FlowController.h" +#include "core/Repository.h" +#include "provenance/Provenance.h" +#include "core/repository/FlowFileRepository.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +void Repository::start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&Repository::threadExecutor, this); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); +} + +void Repository::stop() { + if (!running_) + return; + running_ = false; + if (thread_.joinable()) + thread_.join(); + logger_->log_info("%s Repository Monitor Thread Stop", name_.c_str()); +} + +// repoSize +uint64_t Repository::repoSize() { + return repo_size_; +} + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp new file mode 100644 index 0000000..ef0b9ef --- /dev/null +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -0,0 +1,69 @@ +#include "core/Repository.h" +#include "core/Repository.h" + +#ifdef LEVELDB_SUPPORT +#include "core/repository/FlowFileRepository.h" +#include "provenance/ProvenanceRepository.h" +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +#ifndef LEVELDB_SUPPORT + namespace provenance{ + class ProvenanceRepository; + } +#endif +namespace core { + +#ifndef LEVELDB_SUPPORT + class FlowFileRepository; +#endif + + + std::shared_ptr<core::Repository> createRepository( + const std::string configuration_class_name, bool fail_safe = false) { + + std::string class_name_lc = configuration_class_name; + std::transform(class_name_lc.begin(), class_name_lc.end(), + class_name_lc.begin(), ::tolower); + try { + std::shared_ptr<core::Repository> return_obj = nullptr; + if (class_name_lc == "flowfilerepository") { + + return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<core::repository::FlowFileRepository>()); + } else if (class_name_lc == "provenancerepository") { + + + return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>()); + + } + + if (return_obj){ + return return_obj; + } + if (fail_safe) { + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, + 1, 1); + } else { + throw std::runtime_error( + "Support for the provided configuration class could not be found"); + } + } catch (const std::runtime_error &r) { + if (fail_safe) { + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, + 1, 1); + } + } + + throw std::runtime_error( + "Support for the provided configuration class could not be found"); + } + + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/BaseLogger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/BaseLogger.cpp b/libminifi/src/core/logging/BaseLogger.cpp new file mode 100644 index 0000000..a6b43a8 --- /dev/null +++ b/libminifi/src/core/logging/BaseLogger.cpp @@ -0,0 +1,161 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/logging/BaseLogger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { + +// Logger related configuration items. +const char *BaseLogger::nifi_log_level = "nifi.log.level"; +const char *BaseLogger::nifi_log_appender = "nifi.log.appender"; + +/** + * @brief Log error message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ +void BaseLogger::log_error(const char * const format, ...) { + if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::err)) + return; + FILL_BUFFER + log_str(err, buffer); +} +/** + * @brief Log warn message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ +void BaseLogger::log_warn(const char * const format, ...) { + if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::warn)) + return; + FILL_BUFFER + log_str(warn, buffer); +} +/** + * @brief Log info message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ +void BaseLogger::log_info(const char * const format, ...) { + if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::info)) + return; + FILL_BUFFER + log_str(info, buffer); +} +/** + * @brief Log debug message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ +void BaseLogger::log_debug(const char * const format, ...) { + + if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::debug)) + return; + FILL_BUFFER + log_str(debug, buffer); +} +/** + * @brief Log trace message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ +void BaseLogger::log_trace(const char * const format, ...) { + + if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::trace)) + return; + FILL_BUFFER + log_str(debug, buffer); +} + +// overridables + +/** + * @brief Log error message + * @param format format string ('man printf' for syntax) + * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match + */ +void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) { + switch (level) { + case err: + case critical: + if (stderr_ != nullptr) { + stderr_->error(buffer); + } else { + logger_->error(buffer); + } + break; + case warn: + logger_->warn(buffer); + break; + case info: + logger_->info(buffer); + break; + case debug: + logger_->debug(buffer); + break; + case trace: + logger_->trace(buffer); + break; + case off: + break; + default: + logger_->info(buffer); + break; + } + +} + +void BaseLogger::setLogLevel(const std::string &level, + LOG_LEVEL_E defaultLevel) { + std::string logLevel = level; + std::transform(logLevel.begin(), logLevel.end(), logLevel.begin(), ::tolower); + + if (logLevel == "trace") { + setLogLevel(trace); + } else if (logLevel == "debug") { + setLogLevel(debug); + } else if (logLevel == "info") { + setLogLevel(info); + } else if (logLevel == "warn") { + setLogLevel(warn); + } else if (logLevel == "error") { + setLogLevel(err); + } else if (logLevel == "critical") { + setLogLevel(critical); + } else if (logLevel == "off") { + setLogLevel(off); + } else { + setLogLevel(defaultLevel); + } +} + +void BaseLogger::set_error_logger(std::shared_ptr<spdlog::logger> other) { + stderr_ = std::move(other); +} + +} /* namespace logging */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/LogAppenders.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/LogAppenders.cpp b/libminifi/src/core/logging/LogAppenders.cpp new file mode 100644 index 0000000..5d92334 --- /dev/null +++ b/libminifi/src/core/logging/LogAppenders.cpp @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/logging/LogAppenders.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { + +const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr"; + +const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file"; +const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files"; +const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size"; + + +} /* namespace logging */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/logging/Logger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/Logger.cpp b/libminifi/src/core/logging/Logger.cpp new file mode 100644 index 0000000..d8cadf3 --- /dev/null +++ b/libminifi/src/core/logging/Logger.cpp @@ -0,0 +1,40 @@ +/** + * @file Logger.cpp + * Logger class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/logging/Logger.h" + +#include <vector> +#include <queue> +#include <map> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace logging { + +std::shared_ptr<Logger> Logger::singleton_logger_(nullptr); + +} /* namespace logging */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/repository/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp new file mode 100644 index 0000000..c495a67 --- /dev/null +++ b/libminifi/src/core/repository/FlowFileRepository.cpp @@ -0,0 +1,109 @@ +#include "core/repository/FlowFileRepository.h" +#include "FlowFileRecord.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +void FlowFileRepository::run() { + // threshold for purge + uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; + while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); + uint64_t curTime = getTimeMillis(); + uint64_t size = repoSize(); + if (size >= purgeThreshold) { + std::vector<std::string> purgeList; + leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); + std::string key = it->key().ToString(); + if (eventRead->DeSerialize((uint8_t *) it->value().data(), + (int) it->value().size())) { + if ((curTime - eventRead->getEventTime()) > max_partition_millis_) + purgeList.push_back(key); + } else { + logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), + key.c_str()); + purgeList.push_back(key); + } + } + delete it; + for (auto eventId : purgeList) { + logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), + eventId.c_str()); + Delete(eventId); + } + } + if (size > max_partition_bytes_) + repo_full_ = true; + else + repo_full_ = false; + } + return; +} + +void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) + { + + std::vector<std::string> purgeList; + leveldb::Iterator* it = db_->NewIterator( + leveldb::ReadOptions()); + + for (it->SeekToFirst(); it->Valid(); it->Next()) + { + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); + std::string key = it->key().ToString(); + if (eventRead->DeSerialize((uint8_t *) it->value().data(), + (int) it->value().size())) + { + auto search = connectionMap.find(eventRead->getConnectionUuid()); + if (search != connectionMap.end()) + { + // we find the connection for the persistent flowfile, create the flowfile and enqueue that + std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); + std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(),flow_file_ref); + // set store to repo to true so that we do need to persistent again in enqueue + record->setStoredToRepository(true); + search->second->put(record); + } + else + { + if (eventRead->getContentFullPath().length() > 0) + { + std::remove(eventRead->getContentFullPath().c_str()); + } + purgeList.push_back(key); + } + } + else + { + purgeList.push_back(key); + } + } + + delete it; + std::vector<std::string>::iterator itPurge; + for (itPurge = purgeList.begin(); itPurge != purgeList.end(); + itPurge++) + { + std::string eventId = *itPurge; + logger_->log_info("Repository Repo %s Purge %s", + name_.c_str(), + eventId.c_str()); + Delete(eventId); + } + + return; +} + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp new file mode 100644 index 0000000..8e59363 --- /dev/null +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -0,0 +1,490 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/yaml/YamlConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml( + YAML::Node rootFlowNode) { + uuid_t uuid; + + std::string flowName = rootFlowNode["name"].as<std::string>(); + std::string id ; + + try { + rootFlowNode["id"].as<std::string>(); + + uuid_parse(id.c_str(), uuid); + }catch(...) + { + logger_->log_warn("Generating random ID for root node"); + uuid_generate(uuid); + char uuid_str[37]; + uuid_unparse(uuid,uuid_str); + id = uuid_str; + } + + logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str()); + logger_->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); + std::unique_ptr<core::ProcessGroup> group = + FlowConfiguration::createRootProcessGroup(flowName, uuid); + + this->name_ = flowName; + + return group.release(); +} + +void YamlConfiguration::parseProcessorNodeYaml( + YAML::Node processorsNode, core::ProcessGroup * parentGroup) { + int64_t schedulingPeriod = -1; + int64_t penalizationPeriod = -1; + int64_t yieldPeriod = -1; + int64_t runDurationNanos = -1; + uuid_t uuid; + std::shared_ptr<core::Processor> processor = nullptr; + + if (!parentGroup) { + logger_->log_error("parseProcessNodeYaml: no parent group exists"); + return; + } + + if (processorsNode) { + + if (processorsNode.IsSequence()) { + // Evaluate sequence of processors + int numProcessors = processorsNode.size(); + + for (YAML::const_iterator iter = processorsNode.begin(); + iter != processorsNode.end(); ++iter) { + core::ProcessorConfig procCfg; + YAML::Node procNode = iter->as<YAML::Node>(); + + procCfg.name = procNode["name"].as<std::string>(); + procCfg.id = procNode["id"].as<std::string>(); + logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", + procCfg.name.c_str(), procCfg.id.c_str()); + procCfg.javaClass = procNode["class"].as<std::string>(); + logger_->log_debug("parseProcessorNode: class => [%s]", + procCfg.javaClass.c_str()); + + uuid_parse(procCfg.id.c_str(), uuid); + + // Determine the processor name only from the Java class + int lastOfIdx = procCfg.javaClass.find_last_of("."); + if (lastOfIdx != std::string::npos) { + lastOfIdx++; // if a value is found, increment to move beyond the . + int nameLength = procCfg.javaClass.length() - lastOfIdx; + std::string processorName = procCfg.javaClass.substr(lastOfIdx, + nameLength); + processor = this->createProcessor(processorName, uuid); + } + + if (!processor) { + logger_->log_error("Could not create a processor %s with name %s", + procCfg.name.c_str(), procCfg.id.c_str()); + throw std::invalid_argument( + "Could not create processor " + procCfg.name); + } + processor->setName(procCfg.name); + + procCfg.maxConcurrentTasks = procNode["max concurrent tasks"] + .as<std::string>(); + logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", + procCfg.maxConcurrentTasks.c_str()); + procCfg.schedulingStrategy = procNode["scheduling strategy"] + .as<std::string>(); + logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", + procCfg.schedulingStrategy.c_str()); + procCfg.schedulingPeriod = + procNode["scheduling period"].as<std::string>(); + logger_->log_debug("parseProcessorNode: scheduling period => [%s]", + procCfg.schedulingPeriod.c_str()); + procCfg.penalizationPeriod = procNode["penalization period"] + .as<std::string>(); + logger_->log_debug("parseProcessorNode: penalization period => [%s]", + procCfg.penalizationPeriod.c_str()); + procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); + logger_->log_debug("parseProcessorNode: yield period => [%s]", + procCfg.yieldPeriod.c_str()); + procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); + logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", + procCfg.runDurationNanos.c_str()); + + // handle auto-terminated relationships + YAML::Node autoTerminatedSequence = + procNode["auto-terminated relationships list"]; + std::vector<std::string> rawAutoTerminatedRelationshipValues; + if (autoTerminatedSequence.IsSequence() + && !autoTerminatedSequence.IsNull() + && autoTerminatedSequence.size() > 0) { + for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); + relIter != autoTerminatedSequence.end(); ++relIter) { + std::string autoTerminatedRel = relIter->as<std::string>(); + rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); + } + } + procCfg.autoTerminatedRelationships = + rawAutoTerminatedRelationshipValues; + + // handle processor properties + YAML::Node propertiesNode = procNode["Properties"]; + parsePropertiesNodeYaml(&propertiesNode, processor); + + // Take care of scheduling + core::TimeUnit unit; + if (core::Property::StringToTime(procCfg.schedulingPeriod, + schedulingPeriod, unit) + && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, + schedulingPeriod)) { + logger_->log_debug( + "convert: parseProcessorNode: schedulingPeriod => [%d] ns", + schedulingPeriod); + processor->setSchedulingPeriodNano(schedulingPeriod); + } + + if (core::Property::StringToTime(procCfg.penalizationPeriod, + penalizationPeriod, unit) + && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, + penalizationPeriod)) { + logger_->log_debug( + "convert: parseProcessorNode: penalizationPeriod => [%d] ms", + penalizationPeriod); + processor->setPenalizationPeriodMsec(penalizationPeriod); + } + + if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) + && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, + yieldPeriod)) { + logger_->log_debug( + "convert: parseProcessorNode: yieldPeriod => [%d] ms", + yieldPeriod); + processor->setYieldPeriodMsec(yieldPeriod); + } + + // Default to running + processor->setScheduledState(core::RUNNING); + + if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { + processor->setSchedulingStrategy(core::TIMER_DRIVEN); + logger_->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); + } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { + processor->setSchedulingStrategy(core::EVENT_DRIVEN); + logger_->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); + } else { + processor->setSchedulingStrategy(core::CRON_DRIVEN); + logger_->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); + + } + + int64_t maxConcurrentTasks; + if (core::Property::StringToInt(procCfg.maxConcurrentTasks, + maxConcurrentTasks)) { + logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", + maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } + + if (core::Property::StringToInt(procCfg.runDurationNanos, + runDurationNanos)) { + logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", + runDurationNanos); + processor->setRunDurationNano(runDurationNanos); + } + + std::set<core::Relationship> autoTerminatedRelationships; + for (auto &&relString : procCfg.autoTerminatedRelationships) { + core::Relationship relationship(relString, ""); + logger_->log_debug( + "parseProcessorNode: autoTerminatedRelationship => [%s]", + relString.c_str()); + autoTerminatedRelationships.insert(relationship); + } + + processor->setAutoTerminatedRelationships(autoTerminatedRelationships); + + parentGroup->addProcessor(processor); + } + } + } else { + throw new std::invalid_argument( + "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); + } +} + +void YamlConfiguration::parseRemoteProcessGroupYaml( + YAML::Node *rpgNode, core::ProcessGroup * parentGroup) { + uuid_t uuid; + + if (!parentGroup) { + logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists"); + return; + } + + if (rpgNode) { + if (rpgNode->IsSequence()) { + for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); + ++iter) { + YAML::Node rpgNode = iter->as<YAML::Node>(); + + auto name = rpgNode["name"].as<std::string>(); + auto id = rpgNode["id"].as<std::string>(); + + logger_->log_debug( + "parseRemoteProcessGroupYaml: name => [%s], id => [%s]", + name.c_str(), id.c_str()); + + std::string url = rpgNode["url"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", + url.c_str()); + + std::string timeout = rpgNode["timeout"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", + timeout.c_str()); + + std::string yieldPeriod = rpgNode["yield period"].as<std::string>(); + logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", + yieldPeriod.c_str()); + + YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>(); + YAML::Node outputPorts = rpgNode["Output Ports"].as<YAML::Node>(); + core::ProcessGroup *group = NULL; + + uuid_parse(id.c_str(), uuid); + + int64_t timeoutValue = -1; + int64_t yieldPeriodValue = -1; + + group = this->createRemoteProcessGroup(name.c_str(), uuid).release(); + group->setParent(parentGroup); + parentGroup->addProcessGroup(group); + + core::TimeUnit unit; + + if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) + && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, + yieldPeriodValue) && group) { + logger_->log_debug( + "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", + yieldPeriodValue); + group->setYieldPeriodMsec(yieldPeriodValue); + } + + if (core::Property::StringToTime(timeout, timeoutValue, unit) + && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, + timeoutValue) && group) { + logger_->log_debug( + "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", + timeoutValue); + group->setTimeOut(timeoutValue); + } + + group->setTransmitting(true); + group->setURL(url); + + if (inputPorts && inputPorts.IsSequence()) { + for (YAML::const_iterator portIter = inputPorts.begin(); + portIter != inputPorts.end(); ++portIter) { + logger_->log_debug("Got a current port, iterating..."); + + YAML::Node currPort = portIter->as<YAML::Node>(); + + this->parsePortYaml(&currPort, group, SEND); + } // for node + } + if (outputPorts && outputPorts.IsSequence()) { + for (YAML::const_iterator portIter = outputPorts.begin(); + portIter != outputPorts.end(); ++portIter) { + logger_->log_debug("Got a current port, iterating..."); + + YAML::Node currPort = portIter->as<YAML::Node>(); + + this->parsePortYaml(&currPort, group, RECEIVE); + } // for node + } + + } + } + } +} + +void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, + core::ProcessGroup *parent) { + uuid_t uuid; + std::shared_ptr<minifi::Connection> connection = nullptr; + + if (!parent) { + logger_->log_error("parseProcessNode: no parent group was provided"); + return; + } + + if (connectionsNode) { + + if (connectionsNode->IsSequence()) { + for (YAML::const_iterator iter = connectionsNode->begin(); + iter != connectionsNode->end(); ++iter) { + + YAML::Node connectionNode = iter->as<YAML::Node>(); + + std::string name = connectionNode["name"].as<std::string>(); + std::string id = connectionNode["id"].as<std::string>(); + std::string destId = connectionNode["destination id"].as<std::string>(); + + uuid_parse(id.c_str(), uuid); + + logger_->log_debug("Created connection with UUID %s and name %s", + id.c_str(), name.c_str()); + connection = this->createConnection(name, uuid); + auto rawRelationship = connectionNode["source relationship name"] + .as<std::string>(); + core::Relationship relationship(rawRelationship, ""); + logger_->log_debug("parseConnection: relationship => [%s]", + rawRelationship.c_str()); + if (connection) + connection->setRelationship(relationship); + std::string connectionSrcProcId = connectionNode["source id"] + .as<std::string>(); + uuid_t srcUUID; + uuid_parse(connectionSrcProcId.c_str(), srcUUID); + + auto srcProcessor = parent->findProcessor(srcUUID); + + if (!srcProcessor) { + logger_->log_error( + "Could not locate a source with id %s to create a connection", + connectionSrcProcId.c_str()); + throw std::invalid_argument( + "Could not locate a source with id %s to create a connection " + + connectionSrcProcId); + } + + uuid_t destUUID; + uuid_parse(destId.c_str(), destUUID); + auto destProcessor = parent->findProcessor(destUUID); + // If we could not find name, try by UUID + if (!destProcessor) { + uuid_t destUuid; + uuid_parse(destId.c_str(), destUuid); + destProcessor = parent->findProcessor(destUuid); + } + if (destProcessor) { + std::string destUuid = destProcessor->getUUIDStr(); + } + + uuid_t srcUuid; + uuid_t destUuid; + srcProcessor->getUUID(srcUuid); + connection->setSourceUUID(srcUuid); + destProcessor->getUUID(destUuid); + connection->setDestinationUUID(destUuid); + + if (connection) { + parent->addConnection(connection); + } + } + } + + if (connection) + parent->addConnection(connection); + + return; + } +} + +void YamlConfiguration::parsePortYaml(YAML::Node *portNode, + core::ProcessGroup *parent, + TransferDirection direction) { + uuid_t uuid; + std::shared_ptr<core::Processor> processor = NULL; + minifi::RemoteProcessorGroupPort *port = NULL; + + if (!parent) { + logger_->log_error("parseProcessNode: no parent group existed"); + return; + } + + YAML::Node inputPortsObj = portNode->as<YAML::Node>(); + + // generate the random UIID + uuid_generate(uuid); + + auto portId = inputPortsObj["id"].as<std::string>(); + auto nameStr = inputPortsObj["name"].as<std::string>(); + uuid_parse(portId.c_str(), uuid); + + port = new minifi::RemoteProcessorGroupPort(nameStr.c_str(), uuid); + + processor = (std::shared_ptr<core::Processor>) port; + port->setDirection(direction); + port->setTimeOut(parent->getTimeOut()); + port->setTransmitting(true); + processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); + processor->initialize(); + + // handle port properties + YAML::Node nodeVal = portNode->as<YAML::Node>(); + YAML::Node propertiesNode = nodeVal["Properties"]; + + parsePropertiesNodeYaml(&propertiesNode, processor); + + // add processor to parent + parent->addProcessor(processor); + processor->setScheduledState(core::RUNNING); + auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"] + .as<std::string>(); + int64_t maxConcurrentTasks; + if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } + logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", + maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); + +} + +void YamlConfiguration::parsePropertiesNodeYaml( + YAML::Node *propertiesNode, std::shared_ptr<core::Processor> processor) { + // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated + for (YAML::const_iterator propsIter = propertiesNode->begin(); + propsIter != propertiesNode->end(); ++propsIter) { + std::string propertyName = propsIter->first.as<std::string>(); + YAML::Node propertyValueNode = propsIter->second; + if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) { + std::string rawValueString = propertyValueNode.as<std::string>(); + if (!processor->setProperty(propertyName, rawValueString)) { + logger_->log_warn( + "Received property %s with value %s but is not one of the properties for %s", + propertyName.c_str(), rawValueString.c_str(), + processor->getName().c_str()); + } + } + } +} + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/BaseStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp index cf3fe45..1400a1d 100644 --- a/libminifi/src/io/BaseStream.cpp +++ b/libminifi/src/io/BaseStream.cpp @@ -18,6 +18,12 @@ #include "io/BaseStream.h" #include "io/Serializable.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { /** * write 4 bytes to stream * @param base_value non encoded value @@ -26,7 +32,7 @@ * @return resulting write size **/ int BaseStream::write(uint32_t base_value, bool is_little_endian) { - return ::Serializable::write(base_value, (DataStream*) this, is_little_endian); + return Serializable::write(base_value, (DataStream*) this, is_little_endian); } /** @@ -37,7 +43,7 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) { * @return resulting write size **/ int BaseStream::write(uint16_t base_value, bool is_little_endian) { - return ::Serializable::write(base_value, (DataStream*) this, is_little_endian); + return Serializable::write(base_value, (DataStream*) this, is_little_endian); } /** @@ -48,7 +54,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) { * @return resulting write size **/ int BaseStream::write(uint8_t *value, int len) { - return ::Serializable::write(value, len, (DataStream*) this); + return Serializable::write(value, len, (DataStream*) this); } /** @@ -59,7 +65,7 @@ int BaseStream::write(uint8_t *value, int len) { * @return resulting write size **/ int BaseStream::write(uint64_t base_value, bool is_little_endian) { - return ::Serializable::write(base_value, (DataStream*) this, is_little_endian); + return Serializable::write(base_value, (DataStream*) this, is_little_endian); } /** @@ -69,7 +75,7 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) { **/ int BaseStream::write(bool value) { uint8_t v = value; - return ::Serializable::write(v); + return Serializable::write(v); } /** @@ -78,7 +84,7 @@ int BaseStream::write(bool value) { * @return resulting write size **/ int BaseStream::writeUTF(std::string str, bool widen) { - return ::Serializable::writeUTF(str, (DataStream*) this, widen); + return Serializable::writeUTF(str, (DataStream*) this, widen); } /** @@ -88,7 +94,7 @@ int BaseStream::writeUTF(std::string str, bool widen) { * @return resulting read size **/ int BaseStream::read(uint8_t &value) { - return ::Serializable::read(value, (DataStream*) this); + return Serializable::read(value, (DataStream*) this); } /** @@ -98,7 +104,7 @@ int BaseStream::read(uint8_t &value) { * @return resulting read size **/ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { - return ::Serializable::read(base_value, (DataStream*) this); + return Serializable::read(base_value, (DataStream*) this); } /** @@ -108,7 +114,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(char &value) { - return ::Serializable::read(value, (DataStream*) this); + return Serializable::read(value, (DataStream*) this); } /** @@ -119,7 +125,7 @@ int BaseStream::read(char &value) { * @return resulting read size **/ int BaseStream::read(uint8_t *value, int len) { - return ::Serializable::read(value, len, (DataStream*) this); + return Serializable::read(value, len, (DataStream*) this); } /** @@ -129,7 +135,7 @@ int BaseStream::read(uint8_t *value, int len) { * @return resulting read size **/ int BaseStream::read(uint32_t &value, bool is_little_endian) { - return ::Serializable::read(value, (DataStream*) this, is_little_endian); + return Serializable::read(value, (DataStream*) this, is_little_endian); } /** @@ -139,7 +145,7 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(uint64_t &value, bool is_little_endian) { - return ::Serializable::read(value, (DataStream*) this, is_little_endian); + return Serializable::read(value, (DataStream*) this, is_little_endian); } /** @@ -149,5 +155,12 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::readUTF(std::string &str, bool widen) { - return ::Serializable::readUTF(str, (DataStream*) this, widen); + return Serializable::readUTF(str, (DataStream*) this, widen); } + + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index 39b71b4..ad6b04d 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -30,425 +30,424 @@ #include "io/validation.h" #include "io/ClientSocket.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { std::string Socket::HOSTNAME = Socket::getMyHostName(0); - Socket::Socket(const std::string &hostname, const uint16_t port, - const uint16_t listeners = -1) : - requested_hostname_(hostname), port_(port), addr_info_(0), socket_file_descriptor_( - -1), socket_max_(0), listeners_(listeners), canonical_hostname_("") { - logger_ = Logger::getLogger(); - FD_ZERO(&total_list_); - FD_ZERO(&read_fds_); + const uint16_t listeners = -1) + : requested_hostname_(hostname), + port_(port), + addr_info_(0), + socket_file_descriptor_(-1), + socket_max_(0), + listeners_(listeners), + canonical_hostname_("") { + logger_ = logging::Logger::getLogger(); + FD_ZERO(&total_list_); + FD_ZERO(&read_fds_); } -Socket::Socket(const std::string &hostname, const uint16_t port) : - ::Socket(hostname, port, 0) { +Socket::Socket(const std::string &hostname, const uint16_t port) + : Socket(hostname, port, 0) { } -Socket::Socket(const Socket &&other) : - requested_hostname_(std::move(other.requested_hostname_)), port_( - std::move(other.port_)), addr_info_(std::move(other.addr_info_)), socket_file_descriptor_( - other.socket_file_descriptor_), socket_max_( - other.socket_max_.load()), listeners_(other.listeners_), total_list_( - other.total_list_), read_fds_(other.read_fds_), canonical_hostname_( - std::move(other.canonical_hostname_)) { - logger_ = Logger::getLogger(); +Socket::Socket(const Socket &&other) + : requested_hostname_(std::move(other.requested_hostname_)), + port_(std::move(other.port_)), + addr_info_(std::move(other.addr_info_)), + socket_file_descriptor_(other.socket_file_descriptor_), + socket_max_(other.socket_max_.load()), + listeners_(other.listeners_), + total_list_(other.total_list_), + read_fds_(other.read_fds_), + canonical_hostname_(std::move(other.canonical_hostname_)) { + logger_ = logging::Logger::getLogger(); } Socket::~Socket() { - closeStream(); + closeStream(); } -void Socket::closeStream() -{ - if (0 != addr_info_) { - freeaddrinfo(addr_info_); - addr_info_=0; - } +void Socket::closeStream() { + if (0 != addr_info_) { + freeaddrinfo(addr_info_); + addr_info_ = 0; + } - if (socket_file_descriptor_ >= 0) - { - close(socket_file_descriptor_); - socket_file_descriptor_ = -1; - } + if (socket_file_descriptor_ >= 0) { + close(socket_file_descriptor_); + socket_file_descriptor_ = -1; + } } -int8_t Socket::createConnection(const addrinfo *p,in_addr_t &addr) { - if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { - logger_->log_error("error while connecting to server socket"); - return -1; - } - - setSocketOptions(socket_file_descriptor_); - - if (listeners_ > 0) { - - struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; - sa_loc->sin_family = AF_INET; - sa_loc->sin_port = htons(port_); - sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { - logger_->log_error("Could not bind to socket", strerror(errno)); - return -1; - } - } - { - if (listeners_ <=0 ) - { - struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; - sa_loc->sin_family = AF_INET; - //sa_loc->sin_port = htons(port); - sa_loc->sin_port = htons(port_); - // use any address if you are connecting to the local machine for testing - // otherwise we must use the requested hostname - if (IsNullOrEmpty(requested_hostname_) || requested_hostname_=="localhost") - { - sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); - } - else - { - sa_loc->sin_addr.s_addr = addr; - } - if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { - close(socket_file_descriptor_); - socket_file_descriptor_ = -1; - logger_->log_warn("Could not connect to socket, error:%s", strerror(errno)); - return -1; - - } - } - } - - // listen - if (listeners_ > 0) { - if (listen(socket_file_descriptor_, listeners_) == -1) { - logger_->log_warn("attempted connection, saw %s", strerror(errno)); - return -1; - } - - } - // add the listener to the total set - FD_SET(socket_file_descriptor_, &total_list_); - socket_max_ = socket_file_descriptor_; - return 0; -} +int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { + if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + logger_->log_error("error while connecting to server socket"); + return -1; + } -short Socket::initialize() { + setSocketOptions(socket_file_descriptor_); - struct sockaddr_in servAddr; + if (listeners_ > 0) { - addrinfo hints = { sizeof(addrinfo) }; - memset(&hints, 0, sizeof hints); // make sure the struct is empty - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_CANONNAME; - if (listeners_ > 0) - hints.ai_flags |= AI_PASSIVE; + struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; + sa_loc->sin_family = AF_INET; + sa_loc->sin_port = htons(port_); + sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); - hints.ai_protocol = 0; /* any protocol */ + if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { + logger_->log_error("Could not bind to socket", strerror(errno)); + return -1; + } + } + { + if (listeners_ <= 0) { + struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; + sa_loc->sin_family = AF_INET; + //sa_loc->sin_port = htons(port); + sa_loc->sin_port = htons(port_); + // use any address if you are connecting to the local machine for testing + // otherwise we must use the requested hostname + if (IsNullOrEmpty(requested_hostname_) + || requested_hostname_ == "localhost") { + sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); + } else { + sa_loc->sin_addr.s_addr = addr; + } + if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { + close(socket_file_descriptor_); + socket_file_descriptor_ = -1; + logger_->log_warn("Could not connect to socket, error:%s", + strerror(errno)); + return -1; + + } + } + } - int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_); + // listen + if (listeners_ > 0) { + if (listen(socket_file_descriptor_, listeners_) == -1) { + logger_->log_warn("attempted connection, saw %s", strerror(errno)); + return -1; + } - if (errcode != 0) { - logger_->log_error("Saw error during getaddrinfo, error: %s",strerror(errno)); - return -1; - } + } + // add the listener to the total set + FD_SET(socket_file_descriptor_, &total_list_); + socket_max_ = socket_file_descriptor_; + return 0; +} - socket_file_descriptor_ = -1; +short Socket::initialize() { - in_addr_t addr; - struct hostent *h; - #ifdef __MACH__ - h = gethostbyname(requested_hostname_.c_str()); - #else - const char *host; - uint16_t port; + struct sockaddr_in servAddr; - host = requested_hostname_.c_str(); - port = port_; - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); - #endif + addrinfo hints = { sizeof(addrinfo) }; + memset(&hints, 0, sizeof hints); // make sure the struct is empty + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + if (listeners_ > 0) + hints.ai_flags |= AI_PASSIVE; - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + hints.ai_protocol = 0; /* any protocol */ + int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, + &addr_info_); - auto p = addr_info_; - for (; p != NULL; p = p->ai_next) { - if (IsNullOrEmpty(canonical_hostname_)) { - if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname)) - canonical_hostname_ = p->ai_canonname; - } + if (errcode != 0) { + logger_->log_error("Saw error during getaddrinfo, error: %s", + strerror(errno)); + return -1; + } + socket_file_descriptor_ = -1; - //we've successfully connected - if (port_ > 0 && createConnection(p,addr) >= 0) - { - return 0; - break; - } - } + in_addr_t addr; + struct hostent *h; +#ifdef __MACH__ + h = gethostbyname(requested_hostname_.c_str()); +#else + const char *host; + uint16_t port; + + host = requested_hostname_.c_str(); + port = port_; + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif - return -1; + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + + auto p = addr_info_; + for (; p != NULL; p = p->ai_next) { + if (IsNullOrEmpty(canonical_hostname_)) { + if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname)) + canonical_hostname_ = p->ai_canonname; + } + + //we've successfully connected + if (port_ > 0 && createConnection(p, addr) >= 0) { + return 0; + break; + } + } + + return -1; } short Socket::select_descriptor(const uint16_t msec) { - struct timeval tv; - int retval; - - read_fds_ = total_list_; - - tv.tv_sec = msec / 1000; - tv.tv_usec = (msec % 1000) * 1000; - - std::lock_guard<std::recursive_mutex> guard(selection_mutex_); - - if (msec > 0) - retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv); - else - retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL); - - if (retval < 0) { - logger_->log_error("Saw error during selection, error:%i %s",retval,strerror(errno)); - return retval; - } - - for (int i = 0; i <= socket_max_; i++) { - if (FD_ISSET(i, &read_fds_)) { - - if (i == socket_file_descriptor_) { - if (listeners_ > 0) - { - struct sockaddr_storage remoteaddr; // client address - socklen_t addrlen = sizeof remoteaddr; - int newfd = accept(socket_file_descriptor_, - (struct sockaddr *) &remoteaddr, &addrlen); - FD_SET(newfd, &total_list_); // add to master set - if (newfd > socket_max_) { // keep track of the max - socket_max_ = newfd; - } - return newfd; - - - - } - else{ - return socket_file_descriptor_; - } - // we have a new connection - } else { - // data to be received on i - return i; - } - } - - } - - return -1; + struct timeval tv; + int retval; + + read_fds_ = total_list_; + + tv.tv_sec = msec / 1000; + tv.tv_usec = (msec % 1000) * 1000; + + std::lock_guard<std::recursive_mutex> guard(selection_mutex_); + + if (msec > 0) + retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv); + else + retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL); + + if (retval < 0) { + logger_->log_error("Saw error during selection, error:%i %s", retval, + strerror(errno)); + return retval; + } + + for (int i = 0; i <= socket_max_; i++) { + if (FD_ISSET(i, &read_fds_)) { + + if (i == socket_file_descriptor_) { + if (listeners_ > 0) { + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen = sizeof remoteaddr; + int newfd = accept(socket_file_descriptor_, + (struct sockaddr *) &remoteaddr, &addrlen); + FD_SET(newfd, &total_list_); // add to master set + if (newfd > socket_max_) { // keep track of the max + socket_max_ = newfd; + } + return newfd; + + } else { + return socket_file_descriptor_; + } + // we have a new connection + } else { + // data to be received on i + return i; + } + } + + } + + return -1; } short Socket::setSocketOptions(const int sock) { - int opt = 1; - bool nagle_off = true; + int opt = 1; + bool nagle_off = true; #ifndef __MACH__ - if (nagle_off) { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt)) - < 0) { - logger_->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return -1; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, - sizeof(opt)) < 0) { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return -1; - } - } - - int sndsize = 256 * 1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize, - (int) sizeof(sndsize)) < 0) { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return -1; - } + if (nagle_off) { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt)) + < 0) { + logger_->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return -1; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, + sizeof(opt)) < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return -1; + } + } + + int sndsize = 256 * 1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize, + (int) sizeof(sndsize)) < 0) { + logger_->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return -1; + } #else - if (listeners_ > 0) { - // lose the pesky "address already in use" error message - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, - sizeof(opt)) < 0) { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return -1; - } - } + if (listeners_ > 0) { + // lose the pesky "address already in use" error message + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt)) + < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return -1; + } + } #endif - return 0; + return 0; } std::string Socket::getHostname() const { - return canonical_hostname_; + return canonical_hostname_; } int Socket::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) - return -1; - return writeData((uint8_t*) &buf[0], buflen); + if (buf.capacity() < buflen) + return -1; + return writeData((uint8_t*) &buf[0], buflen); } // data stream overrides int Socket::writeData(uint8_t *value, int size) { - int ret = 0, bytes = 0; + int ret = 0, bytes = 0; - while (bytes < size) { + while (bytes < size) { + ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0); + //check for errors + if (ret <= 0) { + close(socket_file_descriptor_); + logger_->log_error("Could not send to %d, error: %s", + socket_file_descriptor_, strerror(errno)); + return ret; + } + bytes += ret; + } - ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0); - //check for errors - if (ret <= 0) { - close(socket_file_descriptor_); - logger_->log_error("Could not send to %d, error: %s",socket_file_descriptor_, strerror(errno)); - return ret; - } - bytes += ret; - - } - - if (ret) - logger_->log_trace("Send data size %d over socket %d", size, - socket_file_descriptor_); + if (ret) + logger_->log_trace("Send data size %d over socket %d", size, + socket_file_descriptor_); - return bytes; + return bytes; } template<typename T> inline std::vector<uint8_t> Socket::readBuffer(const T& t) { - std::vector<uint8_t> buf; - buf.resize(sizeof t); - readData((uint8_t*) &buf[0], sizeof(t)); - return buf; + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData((uint8_t*) &buf[0], sizeof(t)); + return buf; } +int Socket::write(uint64_t base_value, bool is_little_endian) { -int Socket::write(uint64_t base_value, bool is_little_endian){ - - return Serializable::write(base_value,this,is_little_endian); + return Serializable::write(base_value, this, is_little_endian); } - -int Socket::write(uint32_t base_value, bool is_little_endian){ - return Serializable::write(base_value,this,is_little_endian); +int Socket::write(uint32_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); } -int Socket::write(uint16_t base_value, bool is_little_endian){ - return Serializable::write(base_value,this,is_little_endian); +int Socket::write(uint16_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); } - int Socket::read(uint64_t &value, bool is_little_endian) { - auto buf = readBuffer(value); - - if (is_little_endian) { - value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) - | ((uint64_t) (buf[2] & 255) << 40) - | ((uint64_t) (buf[3] & 255) << 32) - | ((uint64_t) (buf[4] & 255) << 24) - | ((uint64_t) (buf[5] & 255) << 16) - | ((uint64_t) (buf[6] & 255) << 8) - | ((uint64_t) (buf[7] & 255) << 0); - } else { - value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) - | ((uint64_t) (buf[2] & 255) << 16) - | ((uint64_t) (buf[3] & 255) << 24) - | ((uint64_t) (buf[4] & 255) << 32) - | ((uint64_t) (buf[5] & 255) << 40) - | ((uint64_t) (buf[6] & 255) << 48) - | ((uint64_t) (buf[7] & 255) << 56); - } - return sizeof(value); + auto buf = readBuffer(value); + + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) + | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) + | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16) + | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) + | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) + | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40) + | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + } + return sizeof(value); } int Socket::read(uint32_t &value, bool is_little_endian) { - auto buf = readBuffer(value); + auto buf = readBuffer(value); - if (is_little_endian) { - value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; - } else { - value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; - } + } - return sizeof(value); + return sizeof(value); } int Socket::read(uint16_t &value, bool is_little_endian) { - auto buf = readBuffer(value); + auto buf = readBuffer(value); - if (is_little_endian) { - value = (buf[0] << 8) | buf[1]; - } else { - value = buf[0] | buf[1] << 8; + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; - } - return sizeof(value); + } + return sizeof(value); } int Socket::readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) - { - buf.resize(buflen); - } - return readData((uint8_t*) &buf[0], buflen); + if (buf.capacity() < buflen) { + buf.resize(buflen); + } + return readData((uint8_t*) &buf[0], buflen); } int Socket::readData(uint8_t *buf, int buflen) { - int total_read = 0; - while (buflen) { - short fd = select_descriptor(1000); - if (fd < 0) { - - logger_->log_info("fd close %i",buflen); - close(socket_file_descriptor_); - return -1; - } - - int bytes_read = recv(fd, buf, buflen, 0); - if (bytes_read <= 0) { - if (bytes_read == 0) - logger_->log_info("Other side hung up on %d", fd); - else { - logger_->log_error("Could not recv on %d, error: %s",fd, strerror(errno)); - } - return -1; - } - - - buflen -= bytes_read; - buf += bytes_read; - total_read += bytes_read; - } - - return total_read; + int total_read = 0; + while (buflen) { + short fd = select_descriptor(1000); + if (fd < 0) { + + logger_->log_info("fd close %i", buflen); + close(socket_file_descriptor_); + return -1; + } + + int bytes_read = recv(fd, buf, buflen, 0); + if (bytes_read <= 0) { + if (bytes_read == 0) + logger_->log_info("Other side hung up on %d", fd); + else { + logger_->log_error("Could not recv on %d, error: %s", fd, + strerror(errno)); + } + return -1; + } + + buflen -= bytes_read; + buf += bytes_read; + total_read += bytes_read; + } + + return total_read; } + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/DataStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp index 3ff0a57..7a10bd9 100644 --- a/libminifi/src/io/DataStream.cpp +++ b/libminifi/src/io/DataStream.cpp @@ -27,6 +27,11 @@ #include <algorithm> #include "io/DataStream.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { int DataStream::writeData(uint8_t *value, int size) { @@ -126,3 +131,10 @@ int DataStream::readData(uint8_t *buf,int buflen) { readBuffer += buflen; return buflen; } + + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/EndianCheck.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/EndianCheck.cpp b/libminifi/src/io/EndianCheck.cpp index f1bc98b..1b5020d 100644 --- a/libminifi/src/io/EndianCheck.cpp +++ b/libminifi/src/io/EndianCheck.cpp @@ -18,5 +18,16 @@ #include "io/EndianCheck.h" - +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian(); + + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/Serializable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp index 78dc63c..f8c623a 100644 --- a/libminifi/src/io/Serializable.cpp +++ b/libminifi/src/io/Serializable.cpp @@ -22,7 +22,11 @@ #include <arpa/inet.h> #include "io/DataStream.h" #include "io/Serializable.h" - +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32)) @@ -216,3 +220,10 @@ int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) { } return ret; } + + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/SocketFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/SocketFactory.cpp b/libminifi/src/io/SocketFactory.cpp deleted file mode 100644 index cbfdf96..0000000 --- a/libminifi/src/io/SocketFactory.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "io/SocketFactory.h" - -#include <atomic> -#include <mutex> - -std::atomic<SocketFactory*> SocketFactory::context_instance_; -std::mutex SocketFactory::context_mutex_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/StreamFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp new file mode 100644 index 0000000..e3aa290 --- /dev/null +++ b/libminifi/src/io/StreamFactory.cpp @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "../../include/io/StreamFactory.h" + +#include <atomic> +#include <mutex> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +std::atomic<StreamFactory*> StreamFactory::context_instance_; +std::mutex StreamFactory::context_mutex_; + + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/TLSSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/TLSSocket.cpp b/libminifi/src/io/TLSSocket.cpp deleted file mode 100644 index 1c81f6c..0000000 --- a/libminifi/src/io/TLSSocket.cpp +++ /dev/null @@ -1,237 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef OPENSSL_SUPPORT -#include "Property.h" -#include "Configure.h" -#include "io/TLSSocket.h" -#include "utils/StringUtils.h" -#include <openssl/ssl.h> -#include <openssl/err.h> - - -std::atomic<TLSContext*> TLSContext::context_instance; -std::mutex TLSContext::context_mutex; - -TLSContext::TLSContext() : - error_value(0), ctx(0), logger_(Logger::getLogger()), configuration( - Configure::getConfigure()) { - -} - -/** - * The memory barrier is defined by the singleton - */ -short TLSContext::initialize() { - if (ctx != 0) { - return error_value; - } - std::string clientAuthStr; - bool needClientCert = true; - if (!(configuration->get(Configure::nifi_security_need_ClientAuth, - clientAuthStr) - && StringUtils::StringToBool(clientAuthStr, needClientCert))) { - needClientCert = true; - } - - SSL_library_init(); - const SSL_METHOD *method; - - OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); - method = TLSv1_2_client_method(); - ctx = SSL_CTX_new(method); - if (ctx == NULL) { - logger_->log_error("Could not create SSL context, error: %s.", - std::strerror(errno)); - error_value = TLS_ERROR_CONTEXT; - return error_value; - } - if (needClientCert) { - std::string certificate; - std::string privatekey; - std::string passphrase; - std::string caCertificate; - - if (!(configuration->get(Configure::nifi_security_client_certificate, - certificate) - && configuration->get( - Configure::nifi_security_client_private_key, privatekey))) { - logger_->log_error( - "Certificate and Private Key PEM file not configured, error: %s.", - std::strerror(errno)); - error_value = TLS_ERROR_PEM_MISSING; - return error_value; - } - // load certificates and private key in PEM format - if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), - SSL_FILETYPE_PEM) <= 0) { - logger_->log_error("Could not create load certificate, error : %s", - std::strerror(errno)); - error_value = TLS_ERROR_CERT_MISSING; - return error_value; - - } - if (configuration->get(Configure::nifi_security_client_pass_phrase, - passphrase)) { - // if the private key has passphase - SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); - } - - - int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), - SSL_FILETYPE_PEM); - if (retp != 1) { - logger_->log_error("Could not create load private key,%i on %s error : %s", - retp,privatekey.c_str(),std::strerror(errno)); - error_value = TLS_ERROR_KEY_ERROR; - return error_value; - } - // verify private key - if (!SSL_CTX_check_private_key(ctx)) { - logger_->log_error( - "Private key does not match the public certificate, error : %s", - std::strerror(errno)); - error_value = TLS_ERROR_KEY_ERROR; - return error_value; - } - // load CA certificates - if (configuration->get(Configure::nifi_security_client_ca_certificate, - caCertificate)) { - retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); - if (retp==0) { - logger_->log_error( - "Can not load CA certificate, Exiting, error : %s", - std::strerror(errno)); - error_value = TLS_ERROR_CERT_ERROR; - return error_value; - } - } - - logger_->log_info("Load/Verify Client Certificate OK."); - } - return 0; -} - -TLSSocket::~TLSSocket() -{ - if (ssl != 0) - SSL_free(ssl); -} -/** - * Constructor that accepts host name, port and listeners. With this - * contructor we will be creating a server socket - * @param hostname our host name - * @param port connecting port - * @param listeners number of listeners in the queue - */ -TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port, - const uint16_t listeners) : - ::Socket(hostname, port, listeners), ssl(0) { -} - -TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) : - ::Socket(hostname, port, 0), ssl(0) { -} - -TLSSocket::TLSSocket(const TLSSocket &&d) : - ::Socket(std::move(d)), ssl(0) { -} - -short TLSSocket::initialize() { - TLSContext *context = TLSContext::getInstance(); - short ret = context->initialize(); - Socket::initialize(); - if (!ret) { - // we have s2s secure config - ssl = SSL_new(context->getContext()); - SSL_set_fd(ssl, socket_file_descriptor_); - if (SSL_connect(ssl) == -1) { - logger_->log_error("SSL socket connect failed to %s %d", - requested_hostname_.c_str(), port_); - SSL_free(ssl); - ssl = NULL; - close(socket_file_descriptor_); - return -1; - } else { - logger_->log_info("SSL socket connect success to %s %d", - requested_hostname_.c_str(), port_); - return 0; - } - } - return ret; -} - -short TLSSocket::select_descriptor(const uint16_t msec) { - if (ssl && SSL_pending(ssl)) - return 1; - return Socket::select_descriptor(msec); -} - -int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen) -{ - return Socket::writeData(buf,buflen); -} - -int TLSSocket::writeData(uint8_t *value, int size) { - if (IsNullOrEmpty(ssl)) - return -1; - // for SSL, wait for the TLS IO is completed - int bytes = 0; - int sent = 0; - while (bytes < size) { - - sent = SSL_write(ssl, value + bytes, size - bytes); - //check for errors - if (sent < 0) { - logger_->log_error("Site2Site Peer socket %d send failed %s", - socket_file_descriptor_, strerror(errno)); - return sent; - } - bytes += sent; - } - return size; -} - -int TLSSocket::readData(uint8_t *buf, int buflen) { - - if (IsNullOrEmpty(ssl)) - return -1; - int total_read = 0; - int status = 0; - while (buflen) { - short fd = select_descriptor(1000); - if (fd <= 0) { - - close(socket_file_descriptor_); - return -1; - } - - int sslStatus; - do { - status = SSL_read(ssl, buf, buflen); - sslStatus = SSL_get_error(ssl, status); - } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); - - buflen -= status; - buf += status; - total_read += status; - } - - return total_read; -} -#endif // #ifdef OPENSSL_SUPPORT http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/io/tls/TLSSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp new file mode 100644 index 0000000..b2df394 --- /dev/null +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -0,0 +1,249 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "properties/Configure.h" +#include "io/tls/TLSSocket.h" +#include "utils/StringUtils.h" + +#include "core/Property.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +#include <openssl/ssl.h> +#include <openssl/err.h> + +std::atomic<TLSContext*> TLSContext::context_instance; +std::mutex TLSContext::context_mutex; + +TLSContext::TLSContext() : + error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configuration( + Configure::getConfigure()) { + +} + +/** + * The memory barrier is defined by the singleton + */ +short TLSContext::initialize() { + if (ctx != 0) { + return error_value; + } + std::string clientAuthStr; + bool needClientCert = true; + if (!(configuration->get(Configure::nifi_security_need_ClientAuth, + clientAuthStr) + && org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) { + needClientCert = true; + } + + SSL_library_init(); + const SSL_METHOD *method; + + OpenSSL_add_all_algorithms(); + SSL_load_error_strings(); + method = TLSv1_2_client_method(); + ctx = SSL_CTX_new(method); + if (ctx == NULL) { + logger_->log_error("Could not create SSL context, error: %s.", + std::strerror(errno)); + error_value = TLS_ERROR_CONTEXT; + return error_value; + } + if (needClientCert) { + std::string certificate; + std::string privatekey; + std::string passphrase; + std::string caCertificate; + + if (!(configuration->get(Configure::nifi_security_client_certificate, + certificate) + && configuration->get( + Configure::nifi_security_client_private_key, privatekey))) { + logger_->log_error( + "Certificate and Private Key PEM file not configured, error: %s.", + std::strerror(errno)); + error_value = TLS_ERROR_PEM_MISSING; + return error_value; + } + // load certificates and private key in PEM format + if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), + SSL_FILETYPE_PEM) <= 0) { + logger_->log_error("Could not create load certificate, error : %s", + std::strerror(errno)); + error_value = TLS_ERROR_CERT_MISSING; + return error_value; + + } + if (configuration->get(Configure::nifi_security_client_pass_phrase, + passphrase)) { + // if the private key has passphase + SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); + } + + + int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), + SSL_FILETYPE_PEM); + if (retp != 1) { + logger_->log_error("Could not create load private key,%i on %s error : %s", + retp,privatekey.c_str(),std::strerror(errno)); + error_value = TLS_ERROR_KEY_ERROR; + return error_value; + } + // verify private key + if (!SSL_CTX_check_private_key(ctx)) { + logger_->log_error( + "Private key does not match the public certificate, error : %s", + std::strerror(errno)); + error_value = TLS_ERROR_KEY_ERROR; + return error_value; + } + // load CA certificates + if (configuration->get(Configure::nifi_security_client_ca_certificate, + caCertificate)) { + retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); + if (retp==0) { + logger_->log_error( + "Can not load CA certificate, Exiting, error : %s", + std::strerror(errno)); + error_value = TLS_ERROR_CERT_ERROR; + return error_value; + } + } + + logger_->log_info("Load/Verify Client Certificate OK."); + } + return 0; +} + +TLSSocket::~TLSSocket() +{ + if (ssl != 0) + SSL_free(ssl); +} +/** + * Constructor that accepts host name, port and listeners. With this + * contructor we will be creating a server socket + * @param hostname our host name + * @param port connecting port + * @param listeners number of listeners in the queue + */ +TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port, + const uint16_t listeners) : + Socket(hostname, port, listeners), ssl(0) { +} + +TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) : + Socket(hostname, port, 0), ssl(0) { +} + +TLSSocket::TLSSocket(const TLSSocket &&d) : + Socket(std::move(d)), ssl(0) { +} + +short TLSSocket::initialize() { + TLSContext *context = TLSContext::getInstance(); + short ret = context->initialize(); + Socket::initialize(); + if (!ret) { + // we have s2s secure config + ssl = SSL_new(context->getContext()); + SSL_set_fd(ssl, socket_file_descriptor_); + if (SSL_connect(ssl) == -1) { + logger_->log_error("SSL socket connect failed to %s %d", + requested_hostname_.c_str(), port_); + SSL_free(ssl); + ssl = NULL; + close(socket_file_descriptor_); + return -1; + } else { + logger_->log_info("SSL socket connect success to %s %d", + requested_hostname_.c_str(), port_); + return 0; + } + } + return ret; +} + +short TLSSocket::select_descriptor(const uint16_t msec) { + if (ssl && SSL_pending(ssl)) + return 1; + return Socket::select_descriptor(msec); +} + +int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen) +{ + return Socket::writeData(buf,buflen); +} + +int TLSSocket::writeData(uint8_t *value, int size) { + if (IsNullOrEmpty(ssl)) + return -1; + // for SSL, wait for the TLS IO is completed + int bytes = 0; + int sent = 0; + while (bytes < size) { + + sent = SSL_write(ssl, value + bytes, size - bytes); + //check for errors + if (sent < 0) { + logger_->log_error("Site2Site Peer socket %d send failed %s", + socket_file_descriptor_, strerror(errno)); + return sent; + } + bytes += sent; + } + return size; +} + +int TLSSocket::readData(uint8_t *buf, int buflen) { + + if (IsNullOrEmpty(ssl)) + return -1; + int total_read = 0; + int status = 0; + while (buflen) { + short fd = select_descriptor(1000); + if (fd <= 0) { + + close(socket_file_descriptor_); + return -1; + } + + int sslStatus; + do { + status = SSL_read(ssl, buf, buflen); + sslStatus = SSL_get_error(ssl, status); + } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); + + buflen -= status; + buf += status; + total_read += status; + } + + return total_read; +} + + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/AppendHostInfo.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp new file mode 100644 index 0000000..24ccc9a --- /dev/null +++ b/libminifi/src/processors/AppendHostInfo.cpp @@ -0,0 +1,124 @@ +/** + * @file AppendHostInfo.cpp + * AppendHostInfo class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <set> +#include <sys/time.h> +#include <string.h> +#include "processors/AppendHostInfo.h" +#include "core/ProcessContext.h" +#include "core/Property.h" +#include "core/ProcessSession.h" + +#include <netdb.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/ioctl.h> +#include <net/if.h> +#include <arpa/inet.h> + +#include "../../include/core/FlowFile.h" +#include "io/ClientSocket.h" + +#define __USE_POSIX +#include <limits.h> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#ifndef HOST_NAME_MAX +#define HOST_NAME_MAX 255 +#endif + +const std::string AppendHostInfo::ProcessorName("AppendHostInfo"); +core::Property AppendHostInfo::InterfaceName( + "Network Interface Name", + "Network interface from which to read an IP v4 address", "eth0"); +core::Property AppendHostInfo::HostAttribute( + "Hostname Attribute", + "Flowfile attribute to used to record the agent's hostname", + "source.hostname"); +core::Property AppendHostInfo::IPAttribute( + "IP Attribute", + "Flowfile attribute to used to record the agent's IP address", + "source.ipv4"); +core::Relationship AppendHostInfo::Success( + "success", "success operational on the flow record"); + +void AppendHostInfo::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(InterfaceName); + properties.insert(HostAttribute); + properties.insert(IPAttribute); + setSupportedProperties(properties); + + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void AppendHostInfo::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::shared_ptr<core::FlowFile> flow = + session->get(); + if (!flow) + return; + + //Get Hostname + + std::string hostAttribute = ""; + context->getProperty(HostAttribute.getName(), hostAttribute); + flow->addAttribute(hostAttribute.c_str(), + org::apache::nifi::minifi::io::Socket::getMyHostName()); + + //Get IP address for the specified interface + std::string iface; + context->getProperty(InterfaceName.getName(), iface); + //Confirm the specified interface name exists on this device + if (if_nametoindex(iface.c_str()) != 0) { + struct ifreq ifr; + int fd = socket(AF_INET, SOCK_DGRAM, 0); + //Type of address to retrieve - IPv4 IP address + ifr.ifr_addr.sa_family = AF_INET; + //Copy the interface name in the ifreq structure + strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1); + ioctl(fd, SIOCGIFADDR, &ifr); + close(fd); + + std::string ipAttribute; + context->getProperty(IPAttribute.getName(), ipAttribute); + flow->addAttribute( + ipAttribute.c_str(), + inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr)); + } + + // Transfer to the relationship + session->transfer(flow, Success); +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
