http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index b0fbffa..6485e6c 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -33,11 +33,20 @@ #include <utility> #include <memory> #include <string> +#include "core/state/metrics/QueueMetrics.h" +#include "core/state/metrics/DeviceInformation.h" +#include "core/state/metrics/SystemMetrics.h" +#include "core/state/metrics/ProcessMetrics.h" +#include "core/state/metrics/RepositoryMetrics.h" +#include "core/state/ProcessorController.h" #include "yaml-cpp/yaml.h" +#include "c2/C2Agent.h" #include "core/ProcessContext.h" #include "core/ProcessGroup.h" #include "utils/StringUtils.h" #include "core/Core.h" +#include "core/ClassLoader.h" +#include "SchedulingAgent.h" #include "core/controller/ControllerServiceProvider.h" #include "core/logging/LoggerConfiguration.h" #include "core/repository/FlowFileRepository.h" @@ -58,6 +67,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo max_timer_driven_threads_(0), max_event_driven_threads_(0), running_(false), + c2_enabled_(true), initialized_(false), provenance_repo_(provenance_repo), flow_file_repo_(flow_file_repo), @@ -88,6 +98,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD; running_ = false; initialized_ = false; + c2_initialized_ = false; root_ = nullptr; protocol_ = new FlowControlProtocol(this, configure); @@ -129,11 +140,13 @@ void FlowController::initializePaths(const std::string &adjustedFilename) { // Create the content repo directory if needed struct stat contentDirStat; - if (stat(ResourceClaim::default_directory_path, &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) { - path = realpath(ResourceClaim::default_directory_path, full_path); + minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY); + + if (stat(minifi::default_directory_path.c_str(), &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) { + path = realpath(minifi::default_directory_path.c_str(), full_path); logger_->log_info("FlowController content directory %s", full_path); } else { - if (mkdir(ResourceClaim::default_directory_path, 0777) == -1) { + if (mkdir(minifi::default_directory_path.c_str(), 0777) == -1) { logger_->log_error("FlowController content directory creation failed"); exit(1); } @@ -156,11 +169,11 @@ FlowController::~FlowController() { provenance_repo_ = nullptr; } -bool FlowController::applyConfiguration(std::string &configurePayload) { +bool FlowController::applyConfiguration(const std::string &configurePayload) { std::unique_ptr<core::ProcessGroup> newRoot; try { newRoot = std::move(flow_configuration_->getRootFromPayload(configurePayload)); - } catch (const YAML::Exception& e) { + } catch (...) { logger_->log_error("Invalid configuration payload"); return false; } @@ -170,7 +183,7 @@ bool FlowController::applyConfiguration(std::string &configurePayload) { logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName().c_str(), newRoot->getVersion()); - std::lock_guard < std::recursive_mutex > flow_lock(mutex_); + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); stop(true); waitUnload(30000); this->root_ = std::move(newRoot); @@ -179,8 +192,8 @@ bool FlowController::applyConfiguration(std::string &configurePayload) { return start(); } -void FlowController::stop(bool force) { - std::lock_guard < std::recursive_mutex > flow_lock(mutex_); +int16_t FlowController::stop(bool force, uint64_t timeToWait) { + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); if (running_) { // immediately indicate that we are not running logger_->log_info("Stop Flow Controller"); @@ -193,6 +206,7 @@ void FlowController::stop(bool force) { this->event_scheduler_->stop(); running_ = false; } + return 0; } /** @@ -219,13 +233,12 @@ void FlowController::waitUnload(const uint64_t timeToWaitMs) { } void FlowController::unload() { - std::lock_guard < std::recursive_mutex > flow_lock(mutex_); + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); if (running_) { stop(true); } if (initialized_) { logger_->log_info("Unload Flow Controller"); - root_ = nullptr; initialized_ = false; name_ = ""; } @@ -234,39 +247,34 @@ void FlowController::unload() { } void FlowController::load() { - std::lock_guard < std::recursive_mutex > flow_lock(mutex_); + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); if (running_) { stop(true); } if (!initialized_) { - std::string listenerType; - // grab the value for configuration - if (this->http_configuration_listener_ == nullptr && configuration_->get(Configure::nifi_configuration_listener_type, listenerType)) { - if (listenerType == "http") { - this->http_configuration_listener_ = std::unique_ptr < minifi::HttpConfigurationListener > (new minifi::HttpConfigurationListener(shared_from_this(), configuration_)); - } - } - logger_->log_info("Initializing timers"); + if (nullptr == timer_scheduler_) { - timer_scheduler_ = std::make_shared < TimerDrivenSchedulingAgent - > (std::static_pointer_cast < core::controller::ControllerServiceProvider > (shared_from_this()), provenance_repo_, flow_file_repo_, content_repo_, configuration_); + timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>( + std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_, + configuration_); } if (nullptr == event_scheduler_) { - event_scheduler_ = std::make_shared < EventDrivenSchedulingAgent - > (std::static_pointer_cast < core::controller::ControllerServiceProvider > (shared_from_this()), provenance_repo_, flow_file_repo_, content_repo_, configuration_); + event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>( + std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_, + configuration_); } logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str()); - this->root_ = std::shared_ptr < core::ProcessGroup > (flow_configuration_->getRoot(configuration_filename_)); + this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_)); logger_->log_info("Loaded root processor Group"); controller_service_provider_ = flow_configuration_->getControllerServiceProvider(); - std::static_pointer_cast < core::controller::StandardControllerServiceProvider > (controller_service_provider_)->setRootGroup(root_); - std::static_pointer_cast < core::controller::StandardControllerServiceProvider - > (controller_service_provider_)->setSchedulingAgent(std::static_pointer_cast < minifi::SchedulingAgent > (event_scheduler_)); + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_); + std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent( + std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_)); logger_->log_info("Loaded controller service provider"); // Load Flow File from Repo @@ -277,7 +285,7 @@ void FlowController::load() { } void FlowController::reload(std::string yamlFile) { - std::lock_guard < std::recursive_mutex > flow_lock(mutex_); + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str()); stop(true); unload(); @@ -303,7 +311,7 @@ void FlowController::loadFlowRepo() { this->root_->getConnections(connectionMap); } logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size()); - auto rep = std::dynamic_pointer_cast < core::repository::FlowFileRepository > (flow_file_repo_); + auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_); if (nullptr != rep) { rep->setConnectionMap(connectionMap); } @@ -313,27 +321,149 @@ void FlowController::loadFlowRepo() { } } -bool FlowController::start() { - std::lock_guard < std::recursive_mutex > flow_lock(mutex_); +int16_t FlowController::start() { + std::lock_guard<std::recursive_mutex> flow_lock(mutex_); if (!initialized_) { logger_->log_error("Can not start Flow Controller because it has not been initialized"); - return false; + return -1; } else { if (!running_) { logger_->log_info("Starting Flow Controller"); controller_service_provider_->enableAllControllerServices(); this->timer_scheduler_->start(); this->event_scheduler_->start(); + if (this->root_ != nullptr) { + start_time_ = std::chrono::steady_clock::now(); this->root_->startProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get()); } + initializeC2(); running_ = true; this->protocol_->start(); this->provenance_repo_->start(); this->flow_file_repo_->start(); logger_->log_info("Started Flow Controller"); } - return true; + return 0; + } +} + +void FlowController::initializeC2() { + if (!c2_enabled_) { + return; + } + if (!c2_initialized_) { + std::string c2_enable_str; + + if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) { + bool enable_c2 = true; + utils::StringUtils::StringToBool(c2_enable_str, enable_c2); + c2_enabled_ = enable_c2; + if (!c2_enabled_) { + return; + } + } else { + c2_enabled_ = true; + } + state::StateManager::initialize(); + std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()), + configuration_); + registerUpdateListener(agent); + } + if (!c2_enabled_) { + return; + } + + c2_initialized_ = true; + metrics_.clear(); + component_metrics_.clear(); + component_metrics_by_id_.clear(); + std::string class_csv; + + if (root_ != nullptr) { + std::shared_ptr<state::metrics::QueueMetrics> queueMetrics = std::make_shared<state::metrics::QueueMetrics>(); + + std::map<std::string, std::shared_ptr<Connection>> connections; + root_->getConnections(connections); + for (auto con : connections) { + queueMetrics->addConnection(con.second); + } + metrics_[queueMetrics->getName()] = queueMetrics; + + std::shared_ptr<state::metrics::RepositoryMetrics> repoMetrics = std::make_shared<state::metrics::RepositoryMetrics>(); + + repoMetrics->addRepository(provenance_repo_); + repoMetrics->addRepository(flow_file_repo_); + + metrics_[repoMetrics->getName()] = repoMetrics; + } + + if (configuration_->get("nifi.flow.metrics.classes", class_csv)) { + std::vector<std::string> classes = utils::StringUtils::split(class_csv, ","); + + for (std::string clazz : classes) { + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz); + + if (nullptr == ptr) { + logger_->log_error("No metric defined for %s", clazz.c_str()); + continue; + } + + std::shared_ptr<state::metrics::Metrics> processor = std::static_pointer_cast<state::metrics::Metrics>(ptr); + + std::lock_guard<std::mutex> lock(metrics_mutex_); + + metrics_[processor->getName()] = processor; + } + } + + // first we should get all component metrics, then + // we will build the mapping + std::vector<std::shared_ptr<core::Processor>> processors; + if (root_ != nullptr) { + root_->getAllProcessors(processors); + for (const auto &processor : processors) { + auto rep = std::dynamic_pointer_cast<state::metrics::MetricsSource>(processor); + // we have a metrics source. + if (nullptr != rep) { + std::vector<std::shared_ptr<state::metrics::Metrics>> metric_vector; + rep->getMetrics(metric_vector); + for (auto metric : metric_vector) { + component_metrics_[metric->getName()] = metric; + } + } + } + } + + std::string class_definitions; + if (configuration_->get("nifi.flow.metrics.class.definitions", class_definitions)) { + std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ","); + + for (std::string metricsClass : classes) { + try { + int id = std::stoi(metricsClass); + std::stringstream option; + option << "nifi.flow.metrics.class.definitions." << metricsClass; + if (configuration_->get(option.str(), class_definitions)) { + std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ","); + + for (std::string clazz : classes) { + std::lock_guard<std::mutex> lock(metrics_mutex_); + auto ret = component_metrics_[clazz]; + if (nullptr == ret) { + ret = metrics_[clazz]; + } + if (nullptr == ret) { + logger_->log_error("No metric defined for %s", clazz.c_str()); + continue; + } + component_metrics_by_id_[id].push_back(ret); + } + } + } catch (...) { + logger_->log_error("Could not create metrics class %s", metricsClass); + } + } } } /** @@ -367,7 +497,7 @@ void FlowController::removeControllerService(const std::shared_ptr<core::control * Enables the controller service services * @param serviceNode service node which will be disabled, along with linked services. */ -void FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::future<bool> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { return controller_service_provider_->enableControllerService(serviceNode); } @@ -382,8 +512,8 @@ void FlowController::enableControllerServices(std::vector<std::shared_ptr<core:: * Disables controller services * @param serviceNode service node which will be disabled, along with linked services. */ -void FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - controller_service_provider_->disableControllerService(serviceNode); +std::future<bool> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + return controller_service_provider_->disableControllerService(serviceNode); } /** @@ -474,6 +604,94 @@ void FlowController::enableAllControllerServices() { controller_service_provider_->enableAllControllerServices(); } +int16_t FlowController::applyUpdate(const std::string &configuration) { + applyConfiguration(configuration); + return 0; +} + +int16_t FlowController::clearConnection(const std::string &connection) { + if (root_ != nullptr) { + logger_->log_info("Attempting to clear connection %s", connection); + std::map<std::string, std::shared_ptr<Connection>> connections; + root_->getConnections(connections); + auto conn = connections.find(connection); + if (conn != connections.end()) { + logger_->log_info("Clearing connection %s", connection); + conn->second->drain(); + } + } + return -1; +} + +int16_t FlowController::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint8_t metricsClass) { + auto now = std::chrono::steady_clock::now(); + auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_metrics_capture_).count(); + std::lock_guard<std::mutex> lock(metrics_mutex_); + if (metricsClass == 0) { + for (auto metric : metrics_) { + metric_vector.push_back(metric.second); + } + } else { + auto metrics = component_metrics_by_id_[metricsClass]; + for (const auto &metric : metrics) { + metric_vector.push_back(metric); + } + } + return 0; +} + +std::vector<std::shared_ptr<state::StateController>> FlowController::getAllComponents() { + std::vector<std::shared_ptr<state::StateController>> vec; + vec.push_back(shared_from_this()); + std::vector<std::shared_ptr<core::Processor>> processors; + if (root_ != nullptr) { + root_->getAllProcessors(processors); + for (auto &processor : processors) { + switch (processor->getSchedulingStrategy()) { + case core::SchedulingStrategy::TIMER_DRIVEN: + vec.push_back(std::make_shared<state::ProcessorController>(processor, timer_scheduler_)); + break; + case core::SchedulingStrategy::EVENT_DRIVEN: + vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_)); + break; + default: + break; + } + } + } + return vec; +} +std::vector<std::shared_ptr<state::StateController>> FlowController::getComponents(const std::string &name) { + std::vector<std::shared_ptr<state::StateController>> vec; + + if (name == "FlowController") { + vec.push_back(shared_from_this()); + } else { + // check processors + std::shared_ptr<core::Processor> processor = root_->findProcessor(name); + if (processor != nullptr) { + switch (processor->getSchedulingStrategy()) { + case core::SchedulingStrategy::TIMER_DRIVEN: + vec.push_back(std::make_shared<state::ProcessorController>(processor, timer_scheduler_)); + break; + case core::SchedulingStrategy::EVENT_DRIVEN: + vec.push_back(std::make_shared<state::ProcessorController>(processor, event_scheduler_)); + break; + default: + break; + } + } + } + + return vec; +} + +uint64_t FlowController::getUptime() { + auto now = std::chrono::steady_clock::now(); + auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count(); + return time_since; +} + } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index efd6fa7..b97f290 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -61,9 +61,11 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository snapshot_ = false; - if (claim_ != nullptr) + if (claim_ != nullptr) { // Increase the flow file record owned count for the resource claim claim_->increaseFlowFileRecordOwnedCount(); + content_full_fath_ = claim->getContentFullPath(); + } } FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event, @@ -82,6 +84,7 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository event->getUUID(uuid_); uuid_connection_ = uuidConnection; if (event->getResourceClaim()) { + event->getResourceClaim()->increaseFlowFileRecordOwnedCount(); content_full_fath_ = event->getResourceClaim()->getContentFullPath(); } } @@ -104,8 +107,9 @@ FlowFileRecord::~FlowFileRecord() { claim_->decreaseFlowFileRecordOwnedCount(); std::string value; if (claim_->getFlowFileRecordOwnedCount() <= 0) { - logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str()); - if (!this->stored || !flow_repository_->Get(uuid_str_, value)) { + // we cannot rely on the stored variable here since we + if (flow_repository_ != nullptr && !flow_repository_->Get(uuid_str_, value)) { + logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str()); content_repo_->remove(claim_); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/HttpConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp deleted file mode 100644 index 6b3a061..0000000 --- a/libminifi/src/HttpConfigurationListener.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "HttpConfigurationListener.h" -#include "FlowController.h" -#include <curl/easy.h> -#include <iostream> -#include <iterator> -#include <string> -#include <vector> -#include <utility> - -#include "core/logging/Logger.h" -#include "core/ProcessContext.h" -#include "core/Relationship.h" -#include "io/DataStream.h" -#include "io/StreamFactory.h" -#include "utils/StringUtils.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { - -bool HttpConfigurationListener::pullConfiguration(std::string &configuration) { - if (url_.empty()) - return false; - - bool ret = false; - - std::string fullUrl = url_; - - CURL *http_session = curl_easy_init(); - - curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str()); - - if (connect_timeout_ > 0) { - curl_easy_setopt(http_session, CURLOPT_TIMEOUT, connect_timeout_); - } - - if (read_timeout_ > 0) { - curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_); - } - - if (fullUrl.find("https") != std::string::npos) { - securityConfig_.configureSecureConnection(http_session); - } - - utils::HTTPRequestResponse content; - curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write); - - curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content)); - - CURLcode res = curl_easy_perform(http_session); - - if (res == CURLE_OK) { - logger_->log_debug("HttpConfigurationListener -- curl successful to %s", fullUrl.c_str()); - - std::string response_body(content.data.begin(), content.data.end()); - int64_t http_code = 0; - curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code); - char *content_type; - /* ask for the content-type */ - curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type); - - bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK; - bool body_empty = IsNullOrEmpty(content.data); - - if (isSuccess && !body_empty) { - configuration = std::move(response_body); - logger_->log_debug("config %s", configuration.c_str()); - ret = true; - } else { - logger_->log_error("Cannot output body to content"); - } - } else { - logger_->log_error("HttpConfigurationListener -- curl_easy_perform() failed %s\n", curl_easy_strerror(res)); - } - curl_easy_cleanup(http_session); - - return ret; -} - -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Properties.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index 076cefc..abebfbb 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -34,7 +34,7 @@ Properties::Properties() // Get the config value bool Properties::get(std::string key, std::string &value) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); auto it = properties_.find(key); if (it != properties_.end()) { @@ -46,7 +46,7 @@ bool Properties::get(std::string key, std::string &value) { } int Properties::getInt(const std::string &key, int default_value) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); auto it = properties_.find(key); if (it != properties_.end()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index bcc3d49..03121a8 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -29,7 +29,7 @@ #include <deque> #include <iostream> #include <set> - +#include <vector> #include <string> #include <type_traits> #include <utility> @@ -43,14 +43,18 @@ #include "core/Property.h" #include "core/Relationship.h" #include "Site2SitePeer.h" +#include "utils/HTTPClient.h" namespace org { namespace apache { namespace nifi { namespace minifi { +const char *RemoteProcessorGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME = "RemoteProcessorGroupPortSSLContextService"; + const char *RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort"); core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", ""); +core::Property RemoteProcessorGroupPort::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", ""); core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", ""); core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", ""); core::Relationship RemoteProcessorGroupPort::relation; @@ -71,16 +75,20 @@ std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtoc minifi::Site2SitePeerStatus peer; nextProtocol->setPortId(protocol_uuid_); { - std::lock_guard < std::mutex > lock(site2site_peer_mutex_); + std::lock_guard<std::mutex> lock(site2site_peer_mutex_); peer = site2site_peer_status_list_[this->site2site_peer_index_]; site2site_peer_index_++; if (site2site_peer_index_ >= site2site_peer_status_list_.size()) { site2site_peer_index_ = 0; } } + logger_->log_info("creating new protocol with %s and %d", peer.host_, peer.port_); std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(peer.host_, peer.port_)); std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), peer.host_, peer.port_)); nextProtocol->setPeer(std::move(peer_)); + } else { + logger_->log_info("Refreshing the peer list since there are none configured."); + refreshPeerList(); } } } @@ -103,67 +111,72 @@ void RemoteProcessorGroupPort::initialize() { std::set<core::Property> properties; properties.insert(hostName); properties.insert(port); + properties.insert(SSLContext); properties.insert(portUUID); setSupportedProperties(properties); // Set the supported relationships std::set<core::Relationship> relationships; relationships.insert(relation); setSupportedRelationships(relationships); - curl_global_init(CURL_GLOBAL_DEFAULT); - { - std::lock_guard < std::mutex > lock(site2site_peer_mutex_); - if (!url_.empty()) { - refreshPeerList(); - if (site2site_peer_status_list_.size() > 0) + std::lock_guard<std::mutex> lock(site2site_peer_mutex_); + if (!url_.empty()) { + refreshPeerList(); + if (site2site_peer_status_list_.size() > 0) + site2site_peer_index_ = 0; + } + // populate the site2site protocol for load balancing between them + if (site2site_peer_status_list_.size() > 0) { + int count = site2site_peer_status_list_.size(); + if (max_concurrent_tasks_ > count) + count = max_concurrent_tasks_; + for (int i = 0; i < count; i++) { + std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr; + nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr)); + nextProtocol->setPortId(protocol_uuid_); + minifi::Site2SitePeerStatus peer = site2site_peer_status_list_[this->site2site_peer_index_]; + site2site_peer_index_++; + if (site2site_peer_index_ >= site2site_peer_status_list_.size()) { site2site_peer_index_ = 0; - } - // populate the site2site protocol for load balancing between them - if (site2site_peer_status_list_.size() > 0) { - int count = site2site_peer_status_list_.size(); - if (max_concurrent_tasks_ > count) - count = max_concurrent_tasks_; - for (int i = 0; i < count; i++) { - std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr; - nextProtocol = std::unique_ptr < Site2SiteClientProtocol > (new Site2SiteClientProtocol(nullptr)); - nextProtocol->setPortId(protocol_uuid_); - minifi::Site2SitePeerStatus peer = site2site_peer_status_list_[this->site2site_peer_index_]; - site2site_peer_index_++; - if (site2site_peer_index_ >= site2site_peer_status_list_.size()) { - site2site_peer_index_ = 0; - } - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr < org::apache::nifi::minifi::io::DataStream > (stream_factory_->createSocket(peer.host_, peer.port_)); - std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer > (new Site2SitePeer(std::move(str), peer.host_, peer.port_)); - nextProtocol->setPeer(std::move(peer_)); - returnProtocol(std::move(nextProtocol)); } + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(peer.host_, peer.port_)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), peer.host_, peer.port_)); + nextProtocol->setPeer(std::move(peer_)); + returnProtocol(std::move(nextProtocol)); } } } void RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { std::string value; - - int64_t lvalue; - if (context->getProperty(portUUID.getName(), value)) { uuid_parse(value.c_str(), protocol_uuid_); } + std::string context_name; + if (!context->getProperty(SSLContext.getName(), context_name) || IsNullOrEmpty(context_name)) { + context_name = RPG_SSL_CONTEXT_SERVICE_NAME; + } + std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name); + if (nullptr != service) { + ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); + } } void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - if (!transmitting_) + if (!transmitting_) { return; + } std::string value; - int64_t lvalue; - - if (context->getProperty(hostName.getName(), value) && !value.empty()) { - host_ = value; - } + if (url_.empty()) { + if (context->getProperty(hostName.getName(), value) && !value.empty()) { + host_ = value; + } - if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) { - port_ = static_cast<int> (lvalue); + int64_t lvalue; + if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) { + port_ = static_cast<int>(lvalue); + } } if (context->getProperty(portUUID.getName(), value) && !value.empty()) { @@ -175,13 +188,15 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr protocol_ = getNextProtocol(); if (!protocol_) { + logger_->log_info("no protocol"); context->yield(); return; } + logger_->log_info("got protocol"); if (!protocol_->bootstrap()) { - // bootstrap the client protocol if needeed + // bootstrap the client protocol if needed context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor()); + std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode()->getProcessor()); logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec()); return; @@ -202,14 +217,11 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr context->yield(); session->rollback(); } - - - throw std::exception(); } void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty()) - return; + return; std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller"; @@ -221,52 +233,29 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { if (!rest_user_name_.empty()) { std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token"; - token = utils::get_token(loginUrl, this->rest_user_name_, this->rest_password_, this->securityConfig_); - logger_->log_debug("Token from NiFi REST Api endpoint %s", token); + utils::HTTPClient client(loginUrl, ssl_service); + client.setVerbose(); + token = utils::get_token(client, this->rest_user_name_, this->rest_password_); + logger_->log_debug("Token from NiFi REST Api endpoint %s, %s", loginUrl, token); if (token.empty()) - return; + return; } - CURL *http_session = curl_easy_init(); + utils::HTTPClient client(fullUrl.c_str(), ssl_service); - if (fullUrl.find("https") != std::string::npos) { - this->securityConfig_.configureSecureConnection(http_session); - } + client.initialize("GET"); struct curl_slist *list = NULL; if (!token.empty()) { std::string header = "Authorization: " + token; list = curl_slist_append(list, header.c_str()); - curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, list); + client.setHeaders(list); } - curl_easy_setopt(http_session, CURLOPT_URL, fullUrl.c_str()); - - utils::HTTPRequestResponse content; - curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, - &utils::HTTPRequestResponse::recieve_write); - - curl_easy_setopt(http_session, CURLOPT_WRITEDATA, - static_cast<void*>(&content)); - - CURLcode res = curl_easy_perform(http_session); - if (list) - curl_slist_free_all(list); - - if (res == CURLE_OK) { - std::string response_body(content.data.begin(), content.data.end()); - int64_t http_code = 0; - curl_easy_getinfo(http_session, CURLINFO_RESPONSE_CODE, &http_code); - char *content_type; - /* ask for the content-type */ - curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type); - - bool isSuccess = ((int32_t) (http_code / 100)) == 2 - && res != CURLE_ABORTED_BY_CALLBACK; - bool body_empty = IsNullOrEmpty(content.data); - - if (isSuccess && !body_empty) { - std::string controller = std::move(response_body); + if (client.submit() && client.getResponseCode() == 200) { + const std::vector<char> &response_body = client.getResponseBody(); + if (!response_body.empty()) { + std::string controller = std::string(response_body.begin(), response_body.end()); logger_->log_debug("controller config %s", controller.c_str()); Json::Value value; Json::Reader reader; @@ -284,14 +273,11 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_); } } else { - logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", http_code, fullUrl); + logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", client.getResponseCode(), fullUrl); } } else { - logger_->log_error( - "ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed %s\n", - curl_easy_strerror(res)); + logger_->log_error("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed \n"); } - curl_easy_cleanup(http_session); } void RemoteProcessorGroupPort::refreshPeerList() { @@ -301,17 +287,16 @@ void RemoteProcessorGroupPort::refreshPeerList() { this->site2site_peer_status_list_.clear(); - std::unique_ptr < Site2SiteClientProtocol> protocol; - protocol = std::unique_ptr < Site2SiteClientProtocol - > (new Site2SiteClientProtocol(nullptr)); + std::unique_ptr<Site2SiteClientProtocol> protocol; + protocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr)); protocol->setPortId(protocol_uuid_); - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = - std::unique_ptr < org::apache::nifi::minifi::io::DataStream - > (stream_factory_->createSocket(host_, site2site_port_)); - std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer - > (new Site2SitePeer(std::move(str), host_, site2site_port_)); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_, site2site_port_)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, site2site_port_)); protocol->setPeer(std::move(peer_)); protocol->getPeerList(site2site_peer_status_list_); + + if (site2site_peer_status_list_.size() > 0) + site2site_peer_index_ = 0; } } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp index e7d4557..783a108 100644 --- a/libminifi/src/ResourceClaim.cpp +++ b/libminifi/src/ResourceClaim.cpp @@ -35,11 +35,14 @@ namespace minifi { utils::NonRepeatingStringGenerator ResourceClaim::non_repeating_string_generator_; -char *ResourceClaim::default_directory_path = const_cast<char*>(DEFAULT_CONTENT_DIRECTORY); +std::string default_directory_path = ""; + +void setDefaultDirectory(std::string path) { + default_directory_path = path; +} ResourceClaim::ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, const std::string contentDirectory) - : _flowFileRecordOwnedCount(0), - claim_manager_(claim_manager), + : claim_manager_(claim_manager), deleted_(false), logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) { // Create the full content path for the content http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index e228ba5..4a227b5 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -39,10 +39,10 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) { return false; } -void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - logger_->log_trace("Enabling CSN in SchedulingAgent %s", serviceNode->getName()); +std::future<bool> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName()); // reference the enable function from serviceNode - std::function < bool() > f_ex = [serviceNode] { + std::function< bool()> f_ex = [serviceNode] { return serviceNode->enable(); }; // create a functor that will be submitted to the thread pool. @@ -51,11 +51,14 @@ void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller:: // we aren't terribly concerned with the result. std::future<bool> future; component_lifecycle_thread_pool_.execute(std::move(functor), future); + future.wait(); + return future; } -void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::future<bool> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName()); // reference the disable function from serviceNode - std::function < bool() > f_ex = [serviceNode] { + std::function< bool()> f_ex = [serviceNode] { return serviceNode->disable(); }; // create a functor that will be submitted to the thread pool. @@ -64,15 +67,19 @@ void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller: // we aren't terribly concerned with the result. std::future<bool> future; component_lifecycle_thread_pool_.execute(std::move(functor), future); + future.wait(); + return future; } bool SchedulingAgent::hasTooMuchOutGoing(std::shared_ptr<core::Processor> processor) { return processor->flowFilesOutGoingFull(); } -bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { - if (processor->isYield()) +bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, std::shared_ptr<core::ProcessContext> processContext, std::shared_ptr<core::ProcessSessionFactory> sessionFactory) { + if (processor->isYield()) { + logger_->log_debug("Not running %s since it must yield", processor->getName()); return false; + } // No need to yield, reset yield expiration to 0 processor->clearYield(); @@ -89,6 +96,7 @@ bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core processor->incrementActiveTasks(); try { + logger_->log_debug("Triggering %s", processor->getName()); processor->onTrigger(processContext, sessionFactory); processor->decrementActiveTask(); } catch (Exception &exception) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index 024bd35..8b8b646 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -101,32 +101,32 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() { } logger_->log_info("status code is %i", statusCode); switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - return false; - } - logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); - for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedVersion[i]) { - _currentVersion = _supportedVersion[i]; - _currentVersionIndex = i; - return initiateResourceNegotiation(); + case RESOURCE_OK: + logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { + return false; } - } - ret = -1; - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Negotiate protocol response ABORT"); - ret = -1; - return false; - default: - logger_->log_info("Negotiate protocol response unknown code %d", statusCode); - return true; + logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedVersion[i]) { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + return false; + default: + logger_->log_info("Negotiate protocol response unknown code %d", statusCode); + return true; } return true; @@ -163,32 +163,32 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { } switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Codec Negotiate version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - return false; - } - logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); - for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedCodecVersion[i]) { - _currentCodecVersion = _supportedCodecVersion[i]; - _currentCodecVersionIndex = i; - return initiateCodecResourceNegotiation(); + case RESOURCE_OK: + logger_->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { + return false; } - } - ret = -1; - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Codec Negotiate response ABORT"); - ret = -1; - return false; - default: - logger_->log_info("Negotiate Codec response unknown code %d", statusCode); - return true; + logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedCodecVersion[i]) { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + return false; + default: + logger_->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; } return true; @@ -213,7 +213,7 @@ bool Site2SiteClientProtocol::handShake() { return false; } - std::map < std::string, std::string > properties; + std::map<std::string, std::string> properties; properties[HandShakePropertyStr[GZIP]] = "false"; properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); @@ -262,20 +262,20 @@ bool Site2SiteClientProtocol::handShake() { } switch (code) { - case PROPERTIES_OK: - logger_->log_info("Site2Site HandShake Completed"); - _peerState = HANDSHAKED; - return true; - case PORT_NOT_IN_VALID_STATE: - case UNKNOWN_PORT: - case PORTS_DESTINATION_FULL: - logger_->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); - ret = -1; - return false; - default: - logger_->log_info("HandShake Failed because of unknown respond code %d", code); - ret = -1; - return false; + case PROPERTIES_OK: + logger_->log_info("Site2Site HandShake Completed"); + _peerState = HANDSHAKED; + return true; + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + logger_->log_error("Site2Site HandShake Failed because destination port, %s, is either invalid or full", _portIdStr); + ret = -1; + return false; + default: + logger_->log_info("HandShake Failed because of unknown respond code %d", code); + ret = -1; + return false; } return false; @@ -523,27 +523,27 @@ Transaction* Site2SiteClientProtocol::createTransaction(std::string &transaction org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); switch (code) { - case MORE_DATA: - dataAvailable = true; - logger_->log_info("Site2Site peer indicates that data is available"); - transaction = new Transaction(direction, crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - case NO_MORE_DATA: - dataAvailable = false; - logger_->log_info("Site2Site peer indicates that no data is available"); - transaction = new Transaction(direction, crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - default: - logger_->log_info("Site2Site got unexpected response %d when asking for data", code); - return NULL; + case MORE_DATA: + dataAvailable = true; + logger_->log_info("Site2Site peer indicates that data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + case NO_MORE_DATA: + dataAvailable = false; + logger_->log_info("Site2Site peer indicates that no data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + default: + logger_->log_info("Site2Site got unexpected response %d when asking for data", code); + return NULL; } } else { ret = writeRequestType(SEND_FLOWFILES); @@ -661,40 +661,45 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *pac return true; } -bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session) { +int16_t Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session) { int ret; Transaction *transaction = NULL; + if (flowFile && !flowFile->getResourceClaim()->exists()) { + logger_->log_info("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); + return -2; + } + if (_peerState != READY) { bootstrap(); } if (_peerState != READY) { - return false; + return -1; } std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { - return false; + return -1; } else { transaction = it->second; } if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) { logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); - return false; + return -1; } if (transaction->getDirection() != SEND) { logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); - return false; + return -1; } if (transaction->_transfers > 0) { ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); if (ret <= 0) { - return false; + return -1; } } @@ -702,7 +707,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet uint32_t numAttributes = packet->_attributes.size(); ret = transaction->getStream().write(numAttributes); if (ret != 4) { - return false; + return -1; } std::map<std::string, std::string>::iterator itAttribute; @@ -710,11 +715,11 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet ret = transaction->getStream().writeUTF(itAttribute->first, true); if (ret <= 0) { - return false; + return -1; } ret = transaction->getStream().writeUTF(itAttribute->second, true); if (ret <= 0) { - return false; + return -1; } logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str()); } @@ -724,13 +729,15 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet len = flowFile->getSize(); ret = transaction->getStream().write(len); if (ret != 8) { - return false; + logger_->log_info("ret != 8"); + return -1; } if (flowFile->getSize() > 0) { Site2SiteClientProtocol::ReadCallback callback(packet); session->read(flowFile, &callback); if (flowFile->getSize() != packet->_size) { - return false; + logger_->log_info("MisMatched sizes %d %d", flowFile->getSize(), packet->_size); + return -2; } } if (packet->payload_.length() == 0 && len == 0) { @@ -744,12 +751,13 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet ret = transaction->getStream().write(len); if (ret != 8) { - return false; + return -1; } ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len); if (ret != len) { - return false; + logger_->log_info("ret != len"); + return -1; } packet->_size += len; } @@ -759,7 +767,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet transaction->_bytes += len; logger_->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); - return true; + return 0; } void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, core::ProcessSession *session) { @@ -775,7 +783,6 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; } // Create the transaction @@ -786,12 +793,11 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; } try { while (true) { - std::map < std::string, std::string > empty; + std::map<std::string, std::string> empty; uint64_t startTime = getTimeMillis(); std::string payload; DataPacket packet(this, transaction, empty, payload); @@ -799,17 +805,15 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co if (!receive(transactionID, &packet, eof)) { throw Exception(SITE2SITE_EXCEPTION, "Receive Failed"); - return; } if (eof) { // transaction done break; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) { throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); - return; } std::map<std::string, std::string>::iterator it; std::string sourceIdentifier; @@ -824,7 +828,6 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co session->write(flowFile, &callback); if (flowFile->getSize() != packet._size) { throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right"); - return; } } core::Relationship relation; // undefined relationship @@ -840,11 +843,9 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, co if (!confirm(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); - return; } if (!complete(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); - return; } logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", transactionID.c_str(), transfers, bytes); // we yield the receive if we did not get anything @@ -962,12 +963,6 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { } else { logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); - /* - ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); - if (ret <= 0) - return false; - transaction->_state = TRANSACTION_CONFIRMED; - return true; */ return false; } } @@ -1103,7 +1098,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { } void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get()); + std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get()); Transaction *transaction = NULL; @@ -1119,7 +1114,6 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; } // Create the transaction @@ -1130,7 +1124,6 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; } bool continueTransaction = true; @@ -1142,22 +1135,25 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c std::string payload; DataPacket packet(this, transaction, flow->getAttributes(), payload); - if (!send(transactionID, &packet, flow, session)) { + int16_t resp = send(transactionID, &packet, flow, session); + if (resp == -1) { throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); - return; } + logger_->log_info("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str()); - uint64_t endTime = getTimeMillis(); - std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr(); - std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName(); - session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false); + if (resp == 0) { + uint64_t endTime = getTimeMillis(); + std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr(); + std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName(); + session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false); + } session->remove(flow); uint64_t transferNanos = getTimeNano() - startSendingNanos; if (transferNanos > _batchSendNanos) break; - flow = std::static_pointer_cast < FlowFileRecord > (session->get()); + flow = std::static_pointer_cast<FlowFileRecord>(session->get()); if (!flow) { continueTransaction = false; @@ -1168,13 +1164,11 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, c std::stringstream ss; ss << "Confirm Failed for " << transactionID; throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str()); - return; } if (!complete(transactionID)) { std::stringstream ss; ss << "Complete Failed for " << transactionID; throw Exception(SITE2SITE_EXCEPTION, ss.str().c_str()); - return; } logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); } catch (std::exception &exception) { @@ -1212,7 +1206,6 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; } // Create the transaction @@ -1223,25 +1216,22 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core context->yield(); tearDown(); throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; } try { DataPacket packet(this, transaction, attributes, payload); - if (!send(transactionID, &packet, nullptr, session)) { + int16_t resp = send(transactionID, &packet, nullptr, session); + if (resp == -1) { throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); - return; } logger_->log_info("Site2Site transaction %s send bytes length %d", transactionID.c_str(), payload.length()); if (!confirm(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); - return; } if (!complete(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); - return; } logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); } catch (std::exception &exception) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/ThreadedSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 82d4dfd..d74a74a 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -37,7 +37,7 @@ namespace nifi { namespace minifi { void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); admin_yield_duration_ = 0; std::string yieldValue; @@ -67,20 +67,24 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo return; } - core::ProcessorNode processor_node(processor); - auto processContext = std::make_shared < core::ProcessContext > (processor_node, controller_service_provider_, repo_, flow_repo_, content_repo_); - auto sessionFactory = std::make_shared < core::ProcessSessionFactory > (processContext.get()); + std::shared_ptr<core::ProcessorNode> processor_node = std::make_shared<core::ProcessorNode>(processor); - processor->onSchedule(processContext.get(), sessionFactory.get()); + auto processContext = std::make_shared<core::ProcessContext>(processor_node, controller_service_provider_, repo_, flow_repo_, content_repo_); + auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext); + + processor->onSchedule(processContext, sessionFactory); std::vector<std::thread *> threads; ThreadedSchedulingAgent *agent = this; for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) { // reference the disable function from serviceNode + processor->incrementActiveTasks(); + std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () { - return agent->run(processor, processContext.get(), sessionFactory.get()); + return agent->run(processor, processContext, sessionFactory); }; + // create a functor that will be submitted to the thread pool. std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_)); utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor)); @@ -99,7 +103,7 @@ void ThreadedSchedulingAgent::stop() { } void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str()); if (processor->getScheduledState() != core::RUNNING) { @@ -110,6 +114,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces thread_pool_.stopTasks(processor->getUUIDStr()); processor->clearActiveTask(); + + processor->setScheduledState(core::STOPPED); } } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index c3aaa69..13a3439 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -29,8 +29,9 @@ namespace apache { namespace nifi { namespace minifi { -uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { - while (this->running_) { +uint64_t TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, + const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { + while (this->running_ && processor->isRunning()) { bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { // Honor the yield @@ -41,7 +42,7 @@ uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> proces } return processor->getSchedulingPeriodNano() / 1000000; } - return 0; + return processor->getSchedulingPeriodNano() / 1000000; } } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp new file mode 100644 index 0000000..d1c71e6 --- /dev/null +++ b/libminifi/src/c2/C2Agent.cpp @@ -0,0 +1,485 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *repo + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/C2Agent.h" +#include <unistd.h> +#include <csignal> +#include <utility> +#include <vector> +#include <map> +#include <string> +#include <memory> +#include "core/state/UpdateController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, + const std::shared_ptr<Configure> &configuration) + : controller_(controller), + update_sink_(updateSink), + configuration_(configuration), + heart_beat_period_(3000), + max_c2_responses(5), + logger_(logging::LoggerFactory<C2Agent>::getLogger()) { + + running_configuration = std::make_shared<Configure>(); + + last_run_ = std::chrono::steady_clock::now(); + + configure(configuration, false); + + c2_producer_ = [&]() { + auto now = std::chrono::steady_clock::now(); + auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count(); + + // place priority on messages to send to the c2 server + if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { + if (requests.size() > 0) { + int count = 0; + do { + const C2Payload payload(std::move(requests.back())); + requests.pop_back(); + C2Payload && response = protocol_.load()->consumePayload(payload); + enqueue_c2_server_response(std::move(response)); + }while(requests.size() > 0 && ++count < max_c2_responses); + } + request_mutex.unlock(); + } + + if ( time_since > heart_beat_period_ ) { + last_run_ = now; + performHeartBeat(); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false)); + }; + + functions_.push_back(c2_producer_); + + c2_consumer_ = [&]() { + auto now = std::chrono::steady_clock::now(); + if ( queue_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { + if (responses.size() > 0) { + const C2Payload payload(std::move(responses.back())); + responses.pop_back(); + extractPayload(std::move(payload)); + } + queue_mutex.unlock(); + } + return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false)); + }; + + functions_.push_back(c2_consumer_); +} + +void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconfigure) { + std::string clazz, heartbeat_period, device; + + if (!reconfigure) { + if (!configure->get("c2.agent.protocol.class", clazz)) { + clazz = "RESTSender"; + } + logger_->log_info("Class is %s", clazz); + auto protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw(clazz, clazz); + + if (protocol == nullptr) { + protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw("RESTSender", "RESTSender"); + logger_->log_info("Class is RESTSender"); + } + C2Protocol *old_protocol = protocol_.exchange(dynamic_cast<C2Protocol*>(protocol)); + + protocol_.load()->initialize(controller_, configuration_); + + if (reconfigure && old_protocol != nullptr) { + delete old_protocol; + } + } else { + protocol_.load()->update(configure); + } + + if (configure->get("c2.agent.heartbeat.period", heartbeat_period)) { + try { + heart_beat_period_ = std::stoi(heartbeat_period); + } catch (const std::invalid_argument &ie) { + heart_beat_period_ = 3000; + } + } else { + if (!reconfigure) + heart_beat_period_ = 3000; + } + + std::string heartbeat_reporters; + if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { + std::vector<std::string> reporters = utils::StringUtils::split(heartbeat_reporters, ","); + std::lock_guard<std::mutex> lock(heartbeat_mutex); + for (auto reporter : reporters) { + auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(reporter, reporter); + if (heartbeat_reporter_obj == nullptr) { + logger_->log_debug("Could not instantiate %s", reporter); + } else { + std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj); + shp_reporter->initialize(controller_, configuration_); + heartbeat_protocols_.push_back(shp_reporter); + } + } + } +} + +void C2Agent::performHeartBeat() { + C2Payload payload(Operation::HEARTBEAT); + + logger_->log_trace("Performing heartbeat"); + + std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_copy; + { + std::lock_guard<std::timed_mutex> lock(metrics_mutex_); + if (metrics_map_.size() > 0) { + metrics_copy = std::move(metrics_map_); + } + } + + if (metrics_copy.size() > 0) { + C2Payload metrics(Operation::HEARTBEAT); + metrics.setLabel("metrics"); + + for (auto metric : metrics_copy) { + if (metric.second->serialize().size() == 0) + continue; + C2Payload child_metric_payload(Operation::HEARTBEAT); + child_metric_payload.setLabel(metric.first); + serializeMetrics(child_metric_payload, metric.first, metric.second->serialize()); + metrics.addPayload(std::move(child_metric_payload)); + } + payload.addPayload(std::move(metrics)); + } + + if (device_information_.size() > 0) { + C2Payload deviceInfo(Operation::HEARTBEAT); + deviceInfo.setLabel("DeviceInfo"); + + for (auto metric : device_information_) { + C2Payload child_metric_payload(Operation::HEARTBEAT); + child_metric_payload.setLabel(metric.first); + serializeMetrics(child_metric_payload, metric.first, metric.second->serialize()); + deviceInfo.addPayload(std::move(child_metric_payload)); + } + payload.addPayload(std::move(deviceInfo)); + } + + std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getAllComponents(); + + if (!components.empty()) { + C2ContentResponse component_payload(Operation::HEARTBEAT); + component_payload.name = "Components"; + + for (auto &component : components) { + if (component->isRunning()) { + component_payload.operation_arguments[component->getComponentName()] = "enabled"; + } else { + component_payload.operation_arguments[component->getComponentName()] = "disabled"; + } + } + payload.addContent(std::move(component_payload)); + } + + C2ContentResponse state(Operation::HEARTBEAT); + state.name = "state"; + if (update_sink_->isRunning()) { + state.operation_arguments["running"] = "true"; + } else { + state.operation_arguments["running"] = "false"; + } + state.operation_arguments["uptime"] = std::to_string(update_sink_->getUptime()); + + payload.addContent(std::move(state)); + + C2Payload && response = protocol_.load()->consumePayload(payload); + + enqueue_c2_server_response(std::move(response)); + + std::lock_guard<std::mutex> lock(heartbeat_mutex); + + for (auto reporter : heartbeat_protocols_) { + reporter->heartbeat(payload); + } +} + +void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string &name, const std::vector<state::metrics::MetricResponse> &metrics) { + for (auto metric : metrics) { + if (metric.children.size() > 0) { + C2Payload child_metric_payload(metric_payload.getOperation()); + child_metric_payload.setLabel(metric.name); + serializeMetrics(child_metric_payload, metric.name, metric.children); + + metric_payload.addPayload(std::move(child_metric_payload)); + } else { + C2ContentResponse response(metric_payload.getOperation()); + response.name = name; + + response.operation_arguments[metric.name] = metric.value; + + metric_payload.addContent(std::move(response)); + } + } +} + +void C2Agent::extractPayload(const C2Payload &&resp) { + if (resp.getStatus().getState() == state::UpdateState::NESTED) { + const std::vector<C2Payload> &payloads = resp.getNestedPayloads(); + + for (const auto &payload : payloads) { + extractPayload(std::move(payload)); + } + return; + } + switch (resp.getStatus().getState()) { + case state::UpdateState::INITIATE: + logger_->log_debug("Received initiation event from protocol"); + break; + case state::UpdateState::READ_COMPLETE: + logger_->log_trace("Received Ack from Server"); + // we have a heartbeat response. + for (const auto &server_response : resp.getContent()) { + handle_c2_server_response(server_response); + } + break; + case state::UpdateState::FULLY_APPLIED: + logger_->log_debug("Received fully applied event from protocol"); + break; + case state::UpdateState::PARTIALLY_APPLIED: + logger_->log_debug("Received partially applied event from protocol"); + break; + case state::UpdateState::NOT_APPLIED: + logger_->log_debug("Received not applied event from protocol"); + break; + case state::UpdateState::SET_ERROR: + logger_->log_debug("Received error event from protocol"); + break; + case state::UpdateState::READ_ERROR: + logger_->log_debug("Received error event from protocol"); + break; + case state::UpdateState::NESTED: // multiple updates embedded into one + + default: + logger_->log_debug("Received nested event from protocol"); + break; + } +} + +void C2Agent::extractPayload(const C2Payload &resp) { + if (resp.getStatus().getState() == state::UpdateState::NESTED) { + const std::vector<C2Payload> &payloads = resp.getNestedPayloads(); + for (const auto &payload : payloads) { + extractPayload(payload); + } + } + switch (resp.getStatus().getState()) { + case state::UpdateState::READ_COMPLETE: + // we have a heartbeat response. + for (const auto &server_response : resp.getContent()) { + handle_c2_server_response(server_response); + } + break; + default: + break; + } +} + +void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { + switch (resp.op) { + case Operation::CLEAR: + // we've been told to clear something + if (resp.name == "connection") { + logger_->log_debug("Clearing connection %s", resp.name); + for (auto connection : resp.operation_arguments) { + update_sink_->clearConnection(connection.second); + } + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + } else if (resp.name == "repositories") { + update_sink_->drainRepositories(); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + + } else { + logger_->log_debug("Clearing unknown %s", resp.name); + } + + break; + case Operation::UPDATE: { + handle_update(resp); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + } + break; + + case Operation::DESCRIBE: + handle_describe(resp); + break; + case Operation::RESTART: { + update_sink_->stop(true); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + C2Payload && ret = protocol_.load()->consumePayload(std::move(response)); + exit(1); + } + break; + case Operation::START: + case Operation::STOP: { + if (resp.name == "C2" || resp.name == "c2") { + raise(SIGTERM); + } + + std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name); + + // stop all referenced components. + for (auto &component : components) { + logger_->log_debug("Stopping component %s", component->getComponentName()); + if (resp.op == Operation::STOP) + component->stop(true); + else + component->start(); + } + + if (resp.ident.length() > 0) { + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + } + } + // + break; + default: + break; + // do nothing + } +} + +/** + * Descriptions are special types of requests that require information + * to be put into the acknowledgement + */ +void C2Agent::handle_describe(const C2ContentResponse &resp) { + if (resp.name == "metrics") { + auto reporter = std::dynamic_pointer_cast<state::metrics::MetricsReporter>(update_sink_); + + if (reporter != nullptr) { + auto metricsClass = resp.operation_arguments.find("metricsClass"); + uint8_t metric_class_id = 0; + if (metricsClass != resp.operation_arguments.end()) { + // we have a class + try { + metric_class_id = std::stoi(metricsClass->second); + } catch (...) { + logger_->log_error("Could not convert %s into an integer", metricsClass->second); + } + } + + std::vector<std::shared_ptr<state::metrics::Metrics>> metrics_vec; + + reporter->getMetrics(metrics_vec, metric_class_id); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + response.setLabel("metrics"); + for (auto metric : metrics_vec) { + serializeMetrics(response, metric->getName(), metric->serialize()); + } + enqueue_c2_response(std::move(response)); + } + + } else if (resp.name == "configuration") { + auto keys = configuration_->getConfiguredKeys(); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + response.setLabel("configuration_options"); + C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true); + options.setLabel("configuration_options"); + std::string value; + for (auto key : keys) { + C2ContentResponse option(Operation::ACKNOWLEDGE); + option.name = key; + if (configuration_->get(key, value)) { + option.operation_arguments[key] = value; + options.addContent(std::move(option)); + } + } + response.addPayload(std::move(options)); + enqueue_c2_response(std::move(response)); + return; + } + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); +} + +void C2Agent::handle_update(const C2ContentResponse &resp) { + // we've been told to update something + if (resp.name == "configuration") { + auto url = resp.operation_arguments.find("location"); + if (url != resp.operation_arguments.end()) { + // just get the raw data. + C2Payload payload(Operation::UPDATE, false, true); + + C2Payload &&response = protocol_.load()->consumePayload(url->second, payload, RECEIVE, false); + + if (update_sink_->applyUpdate(response.getRawData()) == 0) { + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + enqueue_c2_response(std::move(response)); + } + // send + } else { + auto update_text = resp.operation_arguments.find("configuration_data"); + if (update_text != resp.operation_arguments.end()) { + update_sink_->applyUpdate(update_text->second); + } + } + } else if (resp.name == "c2") { + // prior configuration options were already in place. thus + // we clear the map so that we don't go through replacing + // unnecessary objects. + running_configuration->clear(); + + for (auto entry : resp.operation_arguments) { + running_configuration->set(entry.first, entry.second); + } + + if (resp.operation_arguments.size() > 0) + configure(running_configuration); + } +} + +int16_t C2Agent::setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric) { + auto now = std::chrono::steady_clock::now(); + bool is_device_metric = std::dynamic_pointer_cast<state::metrics::DeviceMetric>(metric) != nullptr; + if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) { + if (is_device_metric) { + device_information_[metric->getName()] = metric; + } else { + metrics_map_[metric->getName()] = metric; + } + metrics_mutex_.unlock(); + return 0; + } + return -1; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
