http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/repository/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp index e6d561a..ac092ea 100644 --- a/libminifi/src/core/repository/FlowFileRepository.cpp +++ b/libminifi/src/core/repository/FlowFileRepository.cpp @@ -18,6 +18,7 @@ #include "core/repository/FlowFileRepository.h" #include <memory> #include <string> +#include <utility> #include <vector> #include "FlowFileRecord.h" @@ -36,24 +37,36 @@ void FlowFileRepository::run() { uint64_t curTime = getTimeMillis(); uint64_t size = repoSize(); if (size >= purgeThreshold) { - std::vector<std::string> purgeList; + std::vector<std::shared_ptr<FlowFileRecord>> purgeList; + std::vector<std::pair<std::string, uint64_t>> keyRemovalList; leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); std::string key = it->key().ToString(); if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { - if ((curTime - eventRead->getEventTime()) > max_partition_millis_) - purgeList.push_back(key); + if ((curTime - eventRead->getEventTime()) > max_partition_millis_) { + purgeList.push_back(eventRead); + keyRemovalList.push_back(std::make_pair(key, it->value().size())); + } } else { logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str()); - purgeList.push_back(key); + keyRemovalList.push_back(std::make_pair(key, it->value().size())); } } delete it; - for (auto eventId : purgeList) { - logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str()); - Delete(eventId); + for (auto eventId : keyRemovalList) { + logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str()); + if (Delete(eventId.first)) { + repo_size_ -= eventId.second; + } + } + + for (const auto &ffr : purgeList) { + auto claim = ffr->getResourceClaim(); + if (claim != nullptr) { + content_repo_->remove(claim); + } } } if (size > max_partition_bytes_) @@ -61,22 +74,23 @@ void FlowFileRepository::run() { else repo_full_ = false; } - return; } -void FlowFileRepository::loadComponent() { - std::vector<std::string> purgeList; +void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { + content_repo_ = content_repo; + std::vector<std::pair<std::string, uint64_t>> purgeList; leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); std::string key = it->key().ToString(); + repo_size_ += it->value().size(); if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { auto search = connectionMap.find(eventRead->getConnectionUuid()); if (search != connectionMap.end()) { // we find the connection for the persistent flowfile, create the flowfile and enqueue that std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); - std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref); + std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); // set store to repo to true so that we do need to persistent again in enqueue record->setStoredToRepository(true); search->second->put(record); @@ -84,19 +98,19 @@ void FlowFileRepository::loadComponent() { if (eventRead->getContentFullPath().length() > 0) { std::remove(eventRead->getContentFullPath().c_str()); } - purgeList.push_back(key); + purgeList.push_back(std::make_pair(key, it->value().size())); } } else { - purgeList.push_back(key); + purgeList.push_back(std::make_pair(key, it->value().size())); } } delete it; - std::vector<std::string>::iterator itPurge; - for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { - std::string eventId = *itPurge; - logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.c_str()); - Delete(eventId); + for (auto eventId : purgeList) { + logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str()); + if (Delete(eventId.first)) { + repo_size_ -= eventId.second; + } } return;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/repository/VolatileContentRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp new file mode 100644 index 0000000..ac575c5 --- /dev/null +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -0,0 +1,183 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/repository/VolatileContentRepository.h" +#include <cstdio> +#include <string> +#include <memory> +#include <thread> +#include "utils/StringUtils.h" +#include "io/FileStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +const char *VolatileContentRepository::minimal_locking = "minimal.locking"; + +bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) { + VolatileRepository::initialize(configure); + resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim> rhsPtr) { + if (lhsPtr == nullptr || rhsPtr == nullptr) { + return false; + } + return lhsPtr->getContentFullPath() == rhsPtr->getContentFullPath();}; + resource_claim_check_ = [](std::shared_ptr<minifi::ResourceClaim> claim) { + return claim->getFlowFileRecordOwnedCount() <= 0;}; + claim_reclaimer_ = [&](std::shared_ptr<minifi::ResourceClaim> claim) {if (claim->getFlowFileRecordOwnedCount() <= 0) { + remove(claim); + } + }; + + if (configure != nullptr) { + bool minimize_locking = false; + std::string value; + std::stringstream strstream; + strstream << Configure::nifi_volatile_repository_options << getName() << "." << minimal_locking; + if (configure->get(strstream.str(), value)) { + utils::StringUtils::StringToBool(value, minimize_locking); + minimize_locking_ = minimize_locking; + } + } + if (!minimize_locking_) { + for (auto ent : value_vector_) { + delete ent; + } + value_vector_.clear(); + } + start(); + + return true; +} + +void VolatileContentRepository::stop() { + running_ = false; +} + +void VolatileContentRepository::run() { +} + +void VolatileContentRepository::start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>()); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_); +} + +std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) { + logger_->log_debug("enter write"); + { + std::lock_guard<std::mutex> lock(map_mutex_); + auto claim_check = master_list_.find(claim->getContentFullPath()); + if (claim_check != master_list_.end()) { + logger_->log_debug("Creating copy of atomic entry"); + auto ent = claim_check->second->takeOwnership(); + if (ent == nullptr) { + logger_->log_debug("write returns nullptr"); + return nullptr; + } + return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); + } + } + + int size = 0; + if (__builtin_expect(minimize_locking_ == true, 1)) { + logger_->log_debug("Minimize locking"); + for (auto ent : value_vector_) { + if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) { + std::lock_guard<std::mutex> lock(map_mutex_); + master_list_[claim->getContentFullPath()] = ent; + return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); + } + size++; + } + } else { + std::lock_guard < std::mutex > lock(map_mutex_); + auto claim_check = master_list_.find(claim->getContentFullPath()); + if (claim_check != master_list_.end()) { + logger_->log_debug("Creating copy of atomic entry"); + return std::make_shared < io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second); + } else { + logger_->log_debug("Creating new atomic entry"); + AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(¤t_size_, &max_size_); + if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) { + master_list_[claim->getContentFullPath()] = ent; + return std::make_shared< io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); + } + } + } + logger_->log_debug("write returns nullptr %d", size); + return nullptr; +} + +std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) { + logger_->log_debug("enter read"); + int size = 0; + { + std::lock_guard<std::mutex> lock(map_mutex_); + auto claim_check = master_list_.find(claim->getContentFullPath()); + if (claim_check != master_list_.end()) { + auto ent = claim_check->second->takeOwnership(); + if (ent == nullptr) { + return nullptr; + } + return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); + } + } + logger_->log_debug("enter read for %s after %d", claim->getContentFullPath(), size); + return nullptr; +} + +bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) { + logger_->log_debug("enter remove for %s, reducing %d", claim->getContentFullPath(), current_size_.load()); + if (__builtin_expect(minimize_locking_ == true, 1)) { + std::lock_guard<std::mutex> lock(map_mutex_); + auto ent = master_list_.find(claim->getContentFullPath()); + if (ent != master_list_.end()) { + // if we cannot remove the entry we will let the owner's destructor + // decrement the reference count and free it + if (ent->second->freeValue(claim)) { + logger_->log_debug("removed %s", claim->getContentFullPath()); + return true; + } + master_list_.erase(claim->getContentFullPath()); + } + } else { + std::lock_guard<std::mutex> lock(map_mutex_); + delete master_list_[claim->getContentFullPath()]; + master_list_.erase(claim->getContentFullPath()); + return true; + } + + logger_->log_debug("could not remove %s", claim->getContentFullPath()); + return false; +} + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/repository/VolatileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp index a7e3a51..6e3e1c6 100644 --- a/libminifi/src/core/repository/VolatileRepository.cpp +++ b/libminifi/src/core/repository/VolatileRepository.cpp @@ -16,7 +16,9 @@ * limitations under the License. */ #include "core/repository/VolatileRepository.h" +#include <map> #include <memory> +#include <limits> #include <string> #include <vector> #include "FlowFileRecord.h" @@ -28,33 +30,6 @@ namespace minifi { namespace core { namespace repository { -const char *VolatileRepository::volatile_repo_max_count = "max.count"; - -void VolatileRepository::run() { - repo_full_ = false; -} - -/** - * Purge - */ -void VolatileRepository::purge() { - while (current_size_ > max_size_) { - for (auto ent : value_vector_) { - // let the destructor do the cleanup - RepoValue value; - if (ent->getValue(value)) { - current_size_ -= value.size(); - logger_->log_info("VolatileRepository -- purge %s %d %d %d", value.getKey(), current_size_.load(), max_size_, current_index_.load()); - } - if (current_size_ < max_size_) - break; - } - } -} - -void VolatileRepository::loadComponent() { -} - } /* namespace repository */ } /* namespace core */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 4ce944e..b5d9a8f 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -35,7 +35,8 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root uuid_t uuid; int64_t version = 0; - checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + checkRequiredField(&rootFlowNode, "name", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); std::string flowName = rootFlowNode["name"].as<std::string>(); std::string id = getOrGenerateId(&rootFlowNode); uuid_parse(id.c_str(), uuid); @@ -47,10 +48,8 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root } } - logger_->log_debug( - "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); - std::unique_ptr<core::ProcessGroup> group = - FlowConfiguration::createRootProcessGroup(flowName, uuid, version); + logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); + std::unique_ptr<core::ProcessGroup> group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version); this->name_ = flowName; @@ -77,7 +76,8 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: core::ProcessorConfig procCfg; YAML::Node procNode = iter->as<YAML::Node>(); - checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY); + checkRequiredField(&procNode, "name", + CONFIG_YAML_PROCESSORS_KEY); procCfg.name = procNode["name"].as<std::string>(); procCfg.id = getOrGenerateId(&procNode); uuid_parse(procCfg.id.c_str(), uuid); @@ -101,11 +101,13 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: } processor->setName(procCfg.name); - checkRequiredField(&procNode, "scheduling strategy", CONFIG_YAML_PROCESSORS_KEY); + checkRequiredField(&procNode, "scheduling strategy", + CONFIG_YAML_PROCESSORS_KEY); procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>(); logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy); - checkRequiredField(&procNode, "scheduling period", CONFIG_YAML_PROCESSORS_KEY); + checkRequiredField(&procNode, "scheduling period", + CONFIG_YAML_PROCESSORS_KEY); procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>(); logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); @@ -224,13 +226,15 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) { YAML::Node currRpgNode = iter->as<YAML::Node>(); - checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + checkRequiredField(&currRpgNode, "name", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); auto name = currRpgNode["name"].as<std::string>(); id = getOrGenerateId(&currRpgNode); logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id); - checkRequiredField(&currRpgNode, "url", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + checkRequiredField(&currRpgNode, "url", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); std::string url = currRpgNode["url"].as<std::string>(); logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url); @@ -266,7 +270,8 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P group->setTransmitting(true); group->setURL(url); - checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + checkRequiredField(&currRpgNode, "Input Ports", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>(); if (inputPorts && inputPorts.IsSequence()) { for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { @@ -312,9 +317,11 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor YAML::Node node = reportNode->as<YAML::Node>(); - checkRequiredField(&node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY); + checkRequiredField(&node, "scheduling strategy", + CONFIG_YAML_PROVENANCE_REPORT_KEY); auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>(); - checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY); + checkRequiredField(&node, "scheduling period", + CONFIG_YAML_PROVENANCE_REPORT_KEY); auto schedulingPeriodStr = node["scheduling period"].as<std::string>(); core::TimeUnit unit; @@ -423,7 +430,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P // Configure basic connection uuid_t uuid; - checkRequiredField(&connectionNode, "name", CONFIG_YAML_CONNECTIONS_KEY); + checkRequiredField(&connectionNode, "name", + CONFIG_YAML_CONNECTIONS_KEY); std::string name = connectionNode["name"].as<std::string>(); std::string id = getOrGenerateId(&connectionNode); uuid_parse(id.c_str(), uuid); @@ -431,7 +439,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P logger_->log_debug("Created connection with UUID %s and name %s", id, name); // Configure connection source - checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY); + checkRequiredField(&connectionNode, "source relationship name", + CONFIG_YAML_CONNECTIONS_KEY); auto rawRelationship = connectionNode["source relationship name"].as<std::string>(); core::Relationship relationship(rawRelationship, ""); logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship); @@ -441,6 +450,24 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P uuid_t srcUUID; + if (connectionNode["max work queue size"]) { + auto max_work_queue_str = connectionNode["max work queue size"].as<std::string>(); + int64_t max_work_queue_size = 0; + if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) { + connection->setMaxQueueSize(max_work_queue_size); + } + logger_->log_debug("Setting %d as the max queue size for %s", max_work_queue_size, name); + } + + if (connectionNode["max work queue data size"]) { + auto max_work_queue_str = connectionNode["max work queue data size"].as<std::string>(); + int64_t max_work_queue_data_size = 0; + if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) { + connection->setMaxQueueDataSize(max_work_queue_data_size); + } + logger_->log_debug("Setting %d as the max queue data size for %s", max_work_queue_data_size, name); + } + if (connectionNode["source id"]) { std::string connectionSrcProcId = connectionNode["source id"].as<std::string>(); uuid_parse(connectionSrcProcId.c_str(), srcUUID); @@ -449,7 +476,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P name, connectionSrcProcId); } else { // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary - checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY); + checkRequiredField(&connectionNode, "source name", + CONFIG_YAML_CONNECTIONS_KEY); std::string connectionSrcProcName = connectionNode["source name"].as<std::string>(); uuid_t tmpUUID; if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) { @@ -486,7 +514,8 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P } else { // we use the same logic as above for resolving the source processor // for looking up the destination processor in absence of a processor id - checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY); + checkRequiredField(&connectionNode, "destination name", + CONFIG_YAML_CONNECTIONS_KEY); std::string connectionDestProcName = connectionNode["destination name"].as<std::string>(); uuid_t tmpUUID; if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) && @@ -534,7 +563,8 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup * YAML::Node inputPortsObj = portNode->as<YAML::Node>(); // Check for required fields - checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); + checkRequiredField(&inputPortsObj, "name", + CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); auto nameStr = inputPortsObj["name"].as<std::string>(); checkRequiredField(&inputPortsObj, "id", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/io/AtomicEntryStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/AtomicEntryStream.cpp b/libminifi/src/io/AtomicEntryStream.cpp new file mode 100644 index 0000000..aac9723 --- /dev/null +++ b/libminifi/src/io/AtomicEntryStream.cpp @@ -0,0 +1,34 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "io/AtomicEntryStream.h" +#include <vector> +#include <mutex> +#include <string> +#include "io/Serializable.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index 57d6f03..5f7f5f4 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -170,6 +170,9 @@ int16_t Socket::initialize() { int hh_errno; gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); #endif + if (h == nullptr) { + return -1; + } memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length); auto p = addr_info_; @@ -197,7 +200,7 @@ int16_t Socket::select_descriptor(const uint16_t msec) { tv.tv_sec = msec / 1000; tv.tv_usec = (msec % 1000) * 1000; - std::lock_guard<std::recursive_mutex> guard(selection_mutex_); + std::lock_guard < std::recursive_mutex > guard(selection_mutex_); if (msec > 0) retval = select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv); @@ -241,14 +244,12 @@ int16_t Socket::setSocketOptions(const int sock) { bool nagle_off = true; #ifndef __MACH__ if (nagle_off) { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) - < 0) { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() TCP_NODELAY failed"); close(sock); return -1; } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), - sizeof(opt)) < 0) { + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() SO_REUSEADDR failed"); close(sock); return -1; @@ -256,8 +257,7 @@ int16_t Socket::setSocketOptions(const int sock) { } int sndsize = 256 * 1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>( &sndsize), - sizeof(sndsize)) < 0) { + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>(&sndsize), sizeof(sndsize)) < 0) { logger_->log_error("setsockopt() SO_SNDBUF failed"); close(sock); return -1; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/io/FileStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp new file mode 100644 index 0000000..3b2bfe1 --- /dev/null +++ b/libminifi/src/io/FileStream.cpp @@ -0,0 +1,160 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io/FileStream.h" +#include <fstream> +#include <vector> +#include <memory> +#include <string> +#include "io/validation.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +FileStream::FileStream(const std::string &path) + : logger_(logging::LoggerFactory<FileStream>::getLogger()), + path_(path), + offset_(0) { + file_stream_ = std::unique_ptr<std::fstream>(new std::fstream()); + file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary); + file_stream_->seekg(0, file_stream_->end); + file_stream_->seekp(0, file_stream_->end); + int len = file_stream_->tellg(); + if (len > 0) { + length_ = len; + } else { + length_ = 0; + } + seek(offset_); +} + +FileStream::FileStream(const std::string &path, uint32_t offset, bool write_enable) + : logger_(logging::LoggerFactory<FileStream>::getLogger()), + path_(path) { + file_stream_ = std::unique_ptr<std::fstream>(new std::fstream()); + if (write_enable) { + file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary); + } else { + file_stream_->open(path.c_str(), std::fstream::in | std::fstream::binary); + } + file_stream_->seekg(0, file_stream_->end); + file_stream_->seekp(0, file_stream_->end); + int len = file_stream_->tellg(); + if (len > 0) { + length_ = len; + } else { + length_ = 0; + } + seek(offset); +} + +void FileStream::closeStream() { + std::lock_guard<std::recursive_mutex> lock(file_lock_); + if (file_stream_ != nullptr) { + file_stream_->close(); + file_stream_ = nullptr; + } +} + +void FileStream::seek(uint64_t offset) { + std::lock_guard<std::recursive_mutex> lock(file_lock_); + offset_ = offset; + file_stream_->clear(); + file_stream_->seekg(offset_); + file_stream_->seekp(offset_); +} + +int FileStream::writeData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen) { + return -1; + } + return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); +} + +// data stream overrides + +int FileStream::writeData(uint8_t *value, int size) { + if (!IsNullOrEmpty(value)) { + std::lock_guard<std::recursive_mutex> lock(file_lock_); + if (file_stream_->write(reinterpret_cast<const char*>(value), size)) { + offset_ += size; + if (offset_ > length_) { + length_ = offset_; + } + file_stream_->seekg(offset_); + file_stream_->flush(); + return size; + } else { + return -1; + } + } else { + return -1; + } +} + +template<typename T> +inline std::vector<uint8_t> FileStream::readBuffer(const T& t) { + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); + return buf; +} + +int FileStream::readData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen) { + buf.resize(buflen); + } + int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); + + if (ret < buflen) { + buf.resize(ret); + } + return ret; +} + +int FileStream::readData(uint8_t *buf, int buflen) { + if (!IsNullOrEmpty(buf)) { + std::lock_guard<std::recursive_mutex> lock(file_lock_); + file_stream_->read(reinterpret_cast<char*>(buf), buflen); + if ((file_stream_->rdstate() & (file_stream_->eofbit | file_stream_->failbit)) != 0) { + file_stream_->clear(); + file_stream_->seekg(0, file_stream_->end); + file_stream_->seekp(0, file_stream_->end); + int len = file_stream_->tellg(); + offset_ = len; + length_ = len; + return offset_; + } else { + offset_ += buflen; + file_stream_->seekp(offset_); + return buflen; + } + + } else { + return -1; + } +} + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/io/StreamFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp index 7990edd..288f4d1 100644 --- a/libminifi/src/io/StreamFactory.cpp +++ b/libminifi/src/io/StreamFactory.cpp @@ -47,11 +47,11 @@ class SocketCreator : public AbstractStreamFactory { public: template<typename Q = V> ContextTypeCheck<true, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) { - return std::make_shared<V>(configure); + return std::make_shared < V > (configure); } template<typename Q = V> ContextTypeCheck<false, std::shared_ptr<Q>> create(const std::shared_ptr<Configure> &configure) { - return std::make_shared<SocketContext>(configure); + return std::make_shared < SocketContext > (configure); } SocketCreator<T, V>(std::shared_ptr<Configure> configure) { @@ -69,7 +69,7 @@ class SocketCreator : public AbstractStreamFactory { std::unique_ptr<Socket> createSocket(const std::string &host, const uint16_t port) { T *socket = create(host, port); - return std::unique_ptr<Socket>(socket); + return std::unique_ptr < Socket > (socket); } private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp index 323d69a..9c6f732 100644 --- a/libminifi/src/processors/ExecuteProcess.cpp +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -145,7 +145,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi break; logger_->log_info("Execute Command Respond %d", numRead); ExecuteProcess::WriteCallback callback(buffer, numRead); - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) continue; flowFile->addAttribute("command", _command.c_str()); @@ -167,7 +167,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi // child exits and close the pipe ExecuteProcess::WriteCallback callback(buffer, totalRead); if (!flowFile) { - flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) break; flowFile->addAttribute("command", _command.c_str()); @@ -185,7 +185,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi logger_->log_info("Execute Command Max Respond %d", sizeof(buffer)); ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); if (!flowFile) { - flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) continue; flowFile->addAttribute("command", _command.c_str()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp index 3741a8f..2fee3f2 100644 --- a/libminifi/src/processors/GenerateFlowFile.cpp +++ b/libminifi/src/processors/GenerateFlowFile.cpp @@ -91,7 +91,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes } for (int i = 0; i < batchSize; i++) { // For each batch - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) return; if (fileSize > 0) @@ -114,7 +114,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes GenerateFlowFile::WriteCallback callback(_data, _dataSize); for (int i = 0; i < batchSize; i++) { // For each batch - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) return; if (fileSize > 0) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index f1dbb21..723d461 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -150,7 +150,7 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses std::string fileName = list.front(); list.pop(); logger_->log_info("GetFile process %s", fileName.c_str()); - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (flowFile == nullptr) return; std::size_t found = fileName.find_last_of("/\\"); @@ -172,19 +172,19 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses } bool GetFile::isListingEmpty() { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); return _dirList.empty(); } void GetFile::putListing(std::string fileName) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); _dirList.push(fileName); } void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); while (!_dirList.empty() && (request.maxSize == 0 || list.size() < request.maxSize)) { std::string fileName = _dirList.front(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/InvokeHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp index d5097bb..7dc75d2 100644 --- a/libminifi/src/processors/InvokeHTTP.cpp +++ b/libminifi/src/processors/InvokeHTTP.cpp @@ -42,6 +42,8 @@ #include "io/StreamFactory.h" #include "ResourceClaim.h" #include "utils/StringUtils.h" +#include "utils/ByteInputCallBack.h" +#include "utils/HTTPUtils.h" namespace org { namespace apache { @@ -113,7 +115,7 @@ void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) { if (my_method == "POST") { curl_easy_setopt(curl, CURLOPT_POST, 1); } else if (my_method == "PUT") { - curl_easy_setopt(curl, CURLOPT_UPLOAD, 1); + curl_easy_setopt(curl, CURLOPT_PUT, 1); } else if (my_method == "GET") { } else { curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, my_method.c_str()); @@ -210,7 +212,7 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) { std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name); if (nullptr != service) { - ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); + ssl_context_service_ = std::static_pointer_cast < minifi::controllers::SSLContextService > (service); } } } @@ -283,14 +285,14 @@ void InvokeHTTP::configure_secure_connection(CURL *http_session) { } void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get()); logger_->log_info("onTrigger InvokeHTTP with %s", method_.c_str()); if (flowFile == nullptr) { if (!emitFlowFile(method_)) { logger_->log_info("InvokeHTTP -- create flow file with %s", method_.c_str()); - flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); } else { logger_->log_info("exiting because method is %s", method_.c_str()); return; @@ -317,9 +319,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * if (read_timeout_ > 0) { curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_); } + utils::HTTPRequestResponse content; - curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, - &utils::HTTPRequestResponse::recieve_write); + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write); curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content)); @@ -333,13 +335,10 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * callbackObj->ptr = callback; callbackObj->pos = 0; logger_->log_info("InvokeHTTP -- Setting callback"); - curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, - (curl_off_t)callback->getBufferSize()); - curl_easy_setopt(http_session, CURLOPT_READFUNCTION, - &utils::HTTPRequestResponse::send_write); - curl_easy_setopt(http_session, CURLOPT_READDATA, - static_cast<void*>(callbackObj)); + curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize()); + curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write); + curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj)); + } else { logger_->log_error("InvokeHTTP -- no resource claim"); } @@ -377,14 +376,14 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * bool output_body_to_content = isSuccess && !putToAttribute; bool body_empty = IsNullOrEmpty(content.data); - logger_->log_info("isSuccess: %d", isSuccess); + logger_->log_info("isSuccess: %d, response code %d ", isSuccess, http_code); std::shared_ptr<FlowFileRecord> response_flow = nullptr; if (output_body_to_content) { if (flowFile != nullptr) { - response_flow = std::static_pointer_cast<FlowFileRecord>(session->create(flowFile)); + response_flow = std::static_pointer_cast < FlowFileRecord > (session->create(flowFile)); } else { - response_flow = std::static_pointer_cast<FlowFileRecord>(session->create()); + response_flow = std::static_pointer_cast < FlowFileRecord > (session->create()); } std::string ct = content_type; @@ -398,7 +397,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession * session->importFrom(stream, response_flow); } else { logger_->log_info("Cannot output body to content"); - response_flow = std::static_pointer_cast<FlowFileRecord>(session->create()); + response_flow = std::static_pointer_cast < FlowFileRecord > (session->create()); } route(flowFile, response_flow, session, context, isSuccess, http_code); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp index c26b41d..d410547 100644 --- a/libminifi/src/processors/ListenHTTP.cpp +++ b/libminifi/src/processors/ListenHTTP.cpp @@ -201,7 +201,7 @@ ListenHTTP::~ListenHTTP() { } void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -243,7 +243,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection * auto session = _processSessionFactory->createSession(); ListenHTTP::WriteCallback callback(conn, req_info); - auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + auto flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) { sendErrorResponse(conn); @@ -295,11 +295,11 @@ ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struc _reqInfo = reqInfo; } -void ListenHTTP::WriteCallback::process(std::ofstream *stream) { +int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { int64_t rlen; int64_t nlen = 0; int64_t tlen = _reqInfo->content_length; - char buf[16384]; + uint8_t buf[16384]; while (nlen < tlen) { rlen = tlen - nlen; @@ -320,6 +320,8 @@ void ListenHTTP::WriteCallback::process(std::ofstream *stream) { nlen += rlen; } + + return nlen; } } /* namespace processors */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp index 054d585..a5fdf28 100644 --- a/libminifi/src/processors/ListenSyslog.cpp +++ b/libminifi/src/processors/ListenSyslog.cpp @@ -279,7 +279,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession SysLogEvent event = eventQueue.front(); eventQueue.pop(); if (firstEvent) { - flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); if (!flowFile) return; ListenSyslog::WriteCallback callback(event.payload, event.len); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp index e308901..ad8e664 100644 --- a/libminifi/src/processors/LogAttribute.cpp +++ b/libminifi/src/processors/LogAttribute.cpp @@ -71,8 +71,9 @@ void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession std::shared_ptr<core::FlowFile> flow = session->get(); - if (!flow) + if (!flow) { return; + } std::string value; if (context->getProperty(LogLevel.getName(), value)) { @@ -107,10 +108,10 @@ void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession message << "\n" << "Payload:" << "\n"; ReadCallback callback(flow->getSize()); session->read(flow, &callback); - for (unsigned int i = 0, j = 0; i < callback._readSize; i++) { - message << std::hex << callback._buffer[i]; + for (unsigned int i = 0, j = 0; i < callback.read_size_; i++) { + message << std::hex << callback.buffer_[i]; j++; - if (j == 16) { + if (j == 80) { message << '\n'; j = 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp index d72c56a..b425ae9 100644 --- a/libminifi/src/processors/PutFile.cpp +++ b/libminifi/src/processors/PutFile.cpp @@ -17,20 +17,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "processors/PutFile.h" -#include <stdio.h> + +#include "../../include/processors/PutFile.h" + +#include <sys/stat.h> +#include <unistd.h> #include <uuid/uuid.h> -#include <sstream> -#include <string> +#include <cstdint> +#include <cstdio> #include <iostream> #include <memory> #include <set> -#include <fstream> -#include "io/validation.h" -#include "utils/StringUtils.h" -#include "utils/TimeUtil.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" +#include <string> + +#include "../../include/core/logging/Logger.h" +#include "../../include/core/ProcessContext.h" +#include "../../include/core/Property.h" +#include "../../include/core/Relationship.h" +#include "../../include/io/BaseStream.h" +#include "../../include/io/DataStream.h" +#include "../../include/io/validation.h" namespace org { namespace apache { @@ -76,7 +82,7 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses return; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -142,10 +148,23 @@ PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::strin } // Copy the entire file contents to the temporary file -void PutFile::ReadCallback::process(std::ifstream *stream) { +int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { // Copy file contents into tmp file _writeSucceeded = false; - _tmpFileOs << stream->rdbuf(); + size_t size = 0; + uint8_t buffer[1024]; + do { + int read = stream->read(buffer, 1024); + if (read < 0) { + return -1; + } + if (read == 0) { + break; + } + _tmpFileOs.write(reinterpret_cast<char*>(buffer), read); + size += read; + } while (size < stream->getSize()); + return size; _writeSucceeded = true; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/processors/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index 46ed1fb..f87f4ec 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -51,7 +51,8 @@ core::Property TailFile::StateFile("State File", "Specifies the file that should " what data has been ingested so that upon restart NiFi can resume from where it left off", "TailFileState"); core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed" - "from the incoming file.", ""); + "from the incoming file.", + ""); core::Relationship TailFile::Success("success", "All files are routed to success"); void TailFile::initialize() { @@ -240,10 +241,9 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se context->yield(); return; } - - std::size_t found = _currentTailFileName.find_last_of("."); - std::string baseName = _currentTailFileName.substr(0, found); - std::string extension = _currentTailFileName.substr(found + 1); + std::size_t found = _currentTailFileName.find_last_of("."); + std::string baseName = _currentTailFileName.substr(0, found); + std::string extension = _currentTailFileName.substr(found + 1); if (!this->_delimiter.empty()) { char delim = this->_delimiter.c_str()[0]; @@ -254,20 +254,20 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se for (std::shared_ptr<FlowFileRecord> ffr : flowFiles) { logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize()); std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension; - ffr->updateKeyedAttribute(PATH, fileLocation); - ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath); + ffr->updateKeyedAttribute(PATH, fileLocation); + ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath); ffr->updateKeyedAttribute(FILENAME, logName); - session->transfer(ffr, Success); + session->transfer(ffr, Success); this->_currentTailFilePosition += ffr->getSize() + 1; storeState(); } } else { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); - if (!flowFile) - return; - flowFile->updateKeyedAttribute(PATH, fileLocation); - flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + if (!flowFile) + return; + flowFile->updateKeyedAttribute(PATH, fileLocation); + flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); session->import(fullPath, flowFile, true, this->_currentTailFilePosition); session->transfer(flowFile, Success); logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, flowFile->getSize()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/provenance/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index 8686a58..3e42a5a 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -17,13 +17,12 @@ */ #include "provenance/Provenance.h" - #include <arpa/inet.h> #include <cstdint> #include <memory> #include <string> #include <vector> - +#include "core/Repository.h" #include "io/DataStream.h" #include "io/Serializable.h" #include "core/logging/Logger.h" @@ -42,30 +41,35 @@ std::shared_ptr<logging::Logger> ProvenanceEventRecord::logger_ = logging::Logge const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" }; -ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType) { +ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType) + : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()), + _eventDuration(0), + _entryDate(0), + _lineageStartDate(0) { _eventType = event; _componentId = componentId; _componentType = componentType; _eventTime = getTimeMillis(); - char eventIdStr[37]; - // Generate the global UUID for th event - id_generator_->generate(_eventId); - uuid_unparse_lower(_eventId, eventIdStr); - _eventIdStr = eventIdStr; } // DeSerialize -bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key) { +bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) { std::string value; bool ret; - ret = repo->Get(key, value); + const std::shared_ptr<core::Repository> repo = std::dynamic_pointer_cast<core::Repository>(store); + + if (nullptr == repo || IsNullOrEmpty(uuidStr_)) { + logger_->log_error("Repo could not be assigned"); + return false; + } + ret = repo->Get(uuidStr_, value); if (!ret) { - logger_->log_error("NiFi Provenance Store event %s can not found", key.c_str()); + logger_->log_error("NiFi Provenance Store event %s can not be found", uuidStr_); return false; } else { - logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length()); + logger_->log_debug("NiFi Provenance Read event %s length %d", uuidStr_, value.length()); } org::apache::nifi::minifi::io::DataStream stream((const uint8_t*) value.data(), value.length()); @@ -73,20 +77,20 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository> ret = DeSerialize(stream); if (ret) { - logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), stream.getSize(), _eventType); + logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", uuidStr_, stream.getSize(), _eventType); } else { - logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), stream.getSize(), _eventType); + logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", uuidStr_, stream.getSize(), _eventType); } return ret; } -bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &repo) { +bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) { org::apache::nifi::minifi::io::DataStream outStream; int ret; - ret = writeUTF(this->_eventIdStr, &outStream); + ret = writeUTF(this->uuidStr_, &outStream); if (ret <= 0) { return false; } @@ -127,7 +131,7 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &r return false; } - ret = writeUTF(this->uuid_, &outStream); + ret = writeUTF(this->flow_uuid_, &outStream); if (ret <= 0) { return false; } @@ -215,20 +219,20 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &r } } // Persistent to the DB - if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { - logger_->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), outStream.getSize()); + if (repo->Serialize(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { + logger_->log_debug("NiFi Provenance Store event %s size %d success", uuidStr_, outStream.getSize()); } else { - logger_->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), outStream.getSize()); + logger_->log_error("NiFi Provenance Store event %s size %d fail", uuidStr_, outStream.getSize()); } return true; } -bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { +bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) { int ret; org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize); - ret = readUTF(this->_eventIdStr, &outStream); + ret = readUTF(this->uuidStr_, &outStream); if (ret <= 0) { return false; @@ -271,7 +275,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferS return false; } - ret = readUTF(this->uuid_, &outStream); + ret = readUTF(this->flow_uuid_, &outStream); if (ret <= 0) { return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/provenance/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp index e4a8ffa..ce19fe4 100644 --- a/libminifi/src/provenance/ProvenanceRepository.cpp +++ b/libminifi/src/provenance/ProvenanceRepository.cpp @@ -31,31 +31,23 @@ void ProvenanceRepository::run() { uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); - std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); uint64_t curTime = getTimeMillis(); uint64_t size = repoSize(); if (size >= purgeThreshold) { - std::vector<std::string> purgeList; leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { ProvenanceEventRecord eventRead; std::string key = it->key().ToString(); - if (eventRead.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size())) { - if ((curTime - eventRead.getEventTime()) > max_partition_millis_) - purgeList.push_back(key); + uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size()); + if (eventTime > 0) { + if ((curTime - eventTime) > max_partition_millis_) + Delete(key); } else { logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str()); - purgeList.push_back(key); + Delete(key); } } delete it; - std::vector<std::string>::iterator itPurge; - - for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { - std::string eventId = *itPurge; - logger_->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str()); - Delete(eventId); - } } if (size > max_partition_bytes_) repo_full_ = true; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/CPPLINT.cfg ---------------------------------------------------------------------- diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg index beed48a..a1e22ad 100644 --- a/libminifi/test/CPPLINT.cfg +++ b/libminifi/test/CPPLINT.cfg @@ -1,3 +1,4 @@ set noparent filter=-build/include_order,-build/include_alpha exclude_files=Server.cpp +exclude_files=TestBase.cpp \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/TestBase.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp new file mode 100644 index 0000000..4c0814d --- /dev/null +++ b/libminifi/test/TestBase.cpp @@ -0,0 +1,211 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "./TestBase.h" +#include <memory> +#include <vector> +#include <set> +#include <string> + +TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) + : + content_repo_(content_repo), + flow_repo_(flow_repo), + prov_repo_(prov_repo), + location(-1), + finalized(false), + current_flowfile_(nullptr) { +} + +std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, +bool linkToPrevious) { + if (finalized) { + return nullptr; + } + std::lock_guard<std::recursive_mutex> guard(mutex); + + uuid_t uuid; + uuid_generate(uuid); + + // initialize the processor + processor->initialize(); + + processor_mapping_[processor->getUUIDStr()] = processor; + + if (!linkToPrevious) { + termination_ = relationship; + } else { + std::shared_ptr<core::Processor> last = processor_queue_.back(); + + if (last == nullptr) { + last = processor; + termination_ = relationship; + } + + std::stringstream connection_name; + connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); + connection->setRelationship(relationship); + + // link the connections so that we can test results at the end for this + connection->setSource(last); + connection->setDestination(processor); + + uuid_t uuid_copy, uuid_copy_next; + last->getUUID(uuid_copy); + connection->setSourceUUID(uuid_copy); + processor->getUUID(uuid_copy_next); + connection->setDestinationUUID(uuid_copy_next); + last->addConnection(connection); + if (last != processor) { + processor->addConnection(connection); + } + relationships_.push_back(connection); + } + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + + processor_nodes_.push_back(node); + + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(*(node.get()), controller_services_provider_, flow_repo_, prov_repo_, content_repo_); + processor_contexts_.push_back(context); + + processor_queue_.push_back(processor); + + return processor; +} + +std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, +bool linkToPrevious) { + if (finalized) { + return nullptr; + } + std::lock_guard<std::recursive_mutex> guard(mutex); + + uuid_t uuid; + uuid_generate(uuid); + + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid); + if (nullptr == ptr) { + throw std::exception(); + } + std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr); + + processor->setName(name); + + return addProcessor(processor, name, relationship, linkToPrevious); +} + +bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { + std::lock_guard<std::recursive_mutex> guard(mutex); + int i = 0; + for (i = 0; i < processor_queue_.size(); i++) { + if (processor_queue_.at(i) == proc) { + break; + } + } + if (i >= processor_queue_.size() || i < 0 || i >= processor_contexts_.size()) { + return false; + } + + return processor_contexts_.at(i)->setProperty(prop, value); +} + +void TestPlan::reset() { + std::lock_guard<std::recursive_mutex> guard(mutex); + process_sessions_.clear(); + factories_.clear(); + location = -1; + for (auto proc : processor_queue_) { + while (proc->getActiveTasks() > 0) { + proc->decrementActiveTask(); + } + } +} + +bool TestPlan::runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify) { + if (!finalized) { + finalize(); + } + std::lock_guard<std::recursive_mutex> guard(mutex); + location++; + std::shared_ptr<core::Processor> processor = processor_queue_.at(location); + std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location); + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context.get()); + factories_.push_back(factory); + if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) { + processor->onSchedule(context.get(), factory.get()); + configured_processors_.push_back(processor); + } + std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context.get()); + process_sessions_.push_back(current_session); + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + if (verify != nullptr) { + verify(context.get(), current_session.get()); + } else { + processor->onTrigger(context.get(), current_session.get()); + } + current_session->commit(); + current_flowfile_ = current_session->get(); + return location + 1 < processor_queue_.size(); +} + +std::set<provenance::ProvenanceEventRecord*> TestPlan::getProvenanceRecords() { + return process_sessions_.at(location)->getProvenanceReporter()->getEvents(); +} + +std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() { + return current_flowfile_; +} + +std::shared_ptr<minifi::Connection> TestPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) { + std::stringstream connection_name; + std::shared_ptr<core::Processor> last = processor; + connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); + connection->setRelationship(termination_); + + // link the connections so that we can test results at the end for this + connection->setSource(last); + if (setDest) + connection->setDestination(processor); + + uuid_t uuid_copy; + last->getUUID(uuid_copy); + connection->setSourceUUID(uuid_copy); + if (setDest) + connection->setDestinationUUID(uuid_copy); + + processor->addConnection(connection); + return connection; +} + +void TestPlan::finalize() { + std::lock_guard<std::recursive_mutex> guard(mutex); + if (relationships_.size() > 0) { + relationships_.push_back(buildFinalConnection(processor_queue_.back())); + } else { + for (auto processor : processor_queue_) { + relationships_.push_back(buildFinalConnection(processor, true)); + } + } + + finalized = true; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 331df08..47db4c3 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -25,6 +25,8 @@ #include "ResourceClaim.h" #include "catch.hpp" #include <vector> +#include <set> +#include <map> #include "core/logging/Logger.h" #include "core/Core.h" #include "properties/Configure.h" @@ -33,6 +35,14 @@ #include "utils/Id.h" #include "spdlog/sinks/ostream_sink.h" #include "spdlog/sinks/dist_sink.h" +#include "unit/ProvenanceTestHelper.h" +#include "core/Core.h" +#include "core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" class LogTestController { public: @@ -105,7 +115,7 @@ class LogTestController { std::ostringstream log_output; std::shared_ptr<logging::Logger> logger_; - private: + private: class TestBootstrapLogger : public logging::Logger { public: TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger) @@ -138,6 +148,73 @@ class LogTestController { std::vector<std::string> modified_loggers; }; +class TestPlan { + public: + + explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo); + + std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, + core::Relationship relationship = core::Relationship("success", "description"), + bool linkToPrevious = false); + + std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), + bool linkToPrevious = false); + + bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value); + + void reset(); + + bool runNextProcessor(std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr); + + std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords(); + + std::shared_ptr<core::FlowFile> getCurrentFlowFile(); + + std::shared_ptr<core::Repository> getFlowRepo() { + return flow_repo_; + } + + std::shared_ptr<core::Repository> getProvenanceRepo() { + return prov_repo_; + } + + std::shared_ptr<core::ContentRepository> getContentRepo() { + return content_repo_; + } + + protected: + + void finalize(); + + std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false); + + std::atomic<bool> finalized; + + std::shared_ptr<core::ContentRepository> content_repo_; + + std::shared_ptr<core::Repository> flow_repo_; + std::shared_ptr<core::Repository> prov_repo_; + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_; + + std::recursive_mutex mutex; + + int location; + + std::shared_ptr<core::ProcessSession> current_session_; + std::shared_ptr<core::FlowFile> current_flowfile_; + + std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_; + std::vector<std::shared_ptr<core::Processor>> processor_queue_; + std::vector<std::shared_ptr<core::Processor>> configured_processors_; + std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_; + std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_; + std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_; + std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_; + std::vector<std::shared_ptr<minifi::Connection>> relationships_; + core::Relationship termination_; +}; + class TestController { public: @@ -148,6 +225,25 @@ class TestController { utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>()); } + std::shared_ptr<TestPlan> createPlan() + { + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(configuration); + + std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>(); + return std::make_shared<TestPlan>(content_repo, repo, repo); + } + + void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(core::ProcessContext*, core::ProcessSession*)> verify = nullptr) { + + while (plan->runNextProcessor(verify) && runToCompletion) + { + + } + } + ~TestController() { for (auto dir : directories) { DIR *created_dir; @@ -176,6 +272,10 @@ class TestController { } protected: + + std::mutex test_mutex; + //std::map<std::string,> + LogTestController &log; std::vector<char*> directories; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/TestServer.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestServer.h b/libminifi/test/TestServer.h new file mode 100644 index 0000000..263a6b3 --- /dev/null +++ b/libminifi/test/TestServer.h @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_TESTSERVER_H_ +#define LIBMINIFI_TEST_TESTSERVER_H_ +#include <regex.h> +#include <string> +#include <iostream> +#include "civetweb.h" + +/* Server context handle */ +static struct mg_context *ctx; +static std::string resp_str; + +static int responder(struct mg_connection *conn, void *response) { + const char *msg = resp_str.c_str(); + + + mg_printf(conn, "HTTP/1.1 200 OK\r\n" + "Content-Length: %lu\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n\r\n", + resp_str.size()); + + mg_write(conn, msg, resp_str.size()); + + return 200; +} + +void init_webserver() { + mg_init_library(0); +} + +void start_webserver(std::string &port, std::string &rooturi, const std::string &response, struct mg_callbacks *callbacks, std::string &cert) { + + std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl; + resp_str = response; + const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", cert.c_str(), "ssl_protocol_version", "3", "ssl_cipher_list", + "ECDHE-RSA-AES256-GCM-SHA384:DES-CBC3-SHA:AES128-SHA:AES128-GCM-SHA256", 0 }; + + if (!mg_check_feature(2)) { + std::cerr << "Error: Embedded example built with SSL support, " << "but civetweb library build without" << std::endl; + exit(1); + } + + ctx = mg_start(callbacks, 0, options); + if (ctx == nullptr) { + std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl; + exit(1); + } + + mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str); + +} + +void start_webserver(std::string &port, std::string &rooturi, const std::string &response) { + + std::cout << "root uri is " << rooturi << ":" << port << "/" << std::endl; + resp_str = response; + + const char *options[] = { "listening_ports", port.c_str(), 0 }; + ctx = mg_start(nullptr, 0, options); + + if (ctx == nullptr) { + std::cerr << "Cannot start CivetWeb - mg_start failed." << std::endl; + exit(1); + } + + mg_set_request_handler(ctx, rooturi.c_str(), responder, (void*) &resp_str); + +} + +bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) { + regex_t regex; + + const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$"; + + int ret = regcomp(®ex, regexstr, REG_EXTENDED); + if (ret) { + return false; + } + + size_t potentialGroups = regex.re_nsub + 1; + regmatch_t groups[potentialGroups]; + if (regexec(®ex, url.c_str(), potentialGroups, groups, 0) == 0) { + for (int i = 0; i < potentialGroups; i++) { + if (groups[i].rm_so == -1) + break; + + std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so); + switch (i) { + case 1: + scheme = str; + break; + case 3: + port = str; + break; + case 4: + path = str; + break; + default: + break; + } + } + } + if (path.empty() || scheme.empty() || port.empty()) + return false; + + regfree(®ex); + + return true; + +} + +static void stop_webserver() { + /* Stop the server */ + mg_stop(ctx); + + /* Un-initialize the library */ + mg_exit_library(); +} + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/integration/ControllerServiceIntegrationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp index 3f27b66..15720eb 100644 --- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp +++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp @@ -80,18 +80,21 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_default_directory, key_dir); std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(configuration); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - DEFAULT_ROOT_GROUP_NAME, + content_repo, + DEFAULT_ROOT_GROUP_NAME, true); disabled = false; std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>(); - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location); + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/integration/HttpConfigurationListenerTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/HttpConfigurationListenerTest.cpp b/libminifi/test/integration/HttpConfigurationListenerTest.cpp index a86b884..b559f41 100644 --- a/libminifi/test/integration/HttpConfigurationListenerTest.cpp +++ b/libminifi/test/integration/HttpConfigurationListenerTest.cpp @@ -46,7 +46,7 @@ void waitToVerifyProcessor() { std::this_thread::sleep_for(std::chrono::seconds(10)); } -class ConfigHandler: public CivetHandler { +class ConfigHandler : public CivetHandler { public: bool handleGet(CivetServer *server, struct mg_connection *conn) { std::ifstream myfile(test_file_location_.c_str()); @@ -57,8 +57,8 @@ class ConfigHandler: public CivetHandler { std::string str = buffer.str(); myfile.close(); mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - str.length()); + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); mg_printf(conn, "%s", str.c_str()); } else { mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); @@ -75,7 +75,7 @@ int main(int argc, char **argv) { LogTestController::getInstance().setInfo<minifi::HttpConfigurationListener>(); const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; - std::vector < std::string > cpp_options; + std::vector<std::string> cpp_options; for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { cpp_options.push_back(options[i]); } @@ -89,45 +89,32 @@ int main(int argc, char **argv) { h_ex.test_file_location_ = test_file_location = argv[1]; key_dir = argv[2]; } - std::shared_ptr<minifi::Configure> configuration = std::make_shared< - minifi::Configure>(); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); configuration->set(minifi::Configure::nifi_default_directory, key_dir); - configuration->set(minifi::Configure::nifi_configuration_listener_type, - "http"); - configuration->set( - minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec"); - configuration->set(minifi::Configure::nifi_configuration_listener_http_url, - "http://localhost:9090/config"); + configuration->set(minifi::Configure::nifi_configuration_listener_type, "http"); + configuration->set(minifi::Configure::nifi_configuration_listener_pull_interval, "1 sec"); + configuration->set(minifi::Configure::nifi_configuration_listener_http_url, "http://localhost:9090/config"); mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - std::shared_ptr<core::Repository> test_repo = - std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< - TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); - - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared - < minifi::io::StreamFactory > (configuration); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr - < core::YamlConfiguration - > (new core::YamlConfiguration(test_repo, test_repo, stream_factory, - configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast - < TestRepository > (test_repo); - - std::shared_ptr<minifi::FlowController> controller = - std::make_shared < minifi::FlowController - > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, - configuration, test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup - > (ptr.get()); + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared < minifi::io::StreamFactory > (configuration); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr < core::YamlConfiguration + > (new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast < TestRepository > (test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared < minifi::FlowController + > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup > (ptr.get()); ptr.release(); controller->load();
