Repository: nifi-minifi-cpp Updated Branches: refs/heads/master e0d45609b -> ed8221b14
MINIFICPP-618: Add C2 triggers, first of which monitors a local file for changes MINIFICPP-624: Add alternate names for C2 configuration items and support both This closes #415. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ed8221b1 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ed8221b1 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ed8221b1 Branch: refs/heads/master Commit: ed8221b14d8ba9b4d1713cb618bca6f194e6bcf7 Parents: e0d4560 Author: Marc Parisi <[email protected]> Authored: Tue Sep 25 17:45:07 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Fri Oct 12 15:22:30 2018 -0400 ---------------------------------------------------------------------- C2.md | 54 ++++++-- extensions/http-curl/protocols/RESTReceiver.cpp | 4 +- extensions/http-curl/protocols/RESTSender.cpp | 12 +- libminifi/CMakeLists.txt | 2 +- libminifi/include/c2/C2Agent.h | 18 ++- libminifi/include/c2/C2Trigger.h | 82 ++++++++++++ .../include/c2/triggers/FileUpdateTrigger.h | 126 +++++++++++++++++++ libminifi/include/properties/Configure.h | 5 +- libminifi/include/properties/Properties.h | 20 ++- libminifi/src/Configure.cpp | 1 + libminifi/src/FlowController.cpp | 5 +- libminifi/src/Properties.cpp | 21 +++- libminifi/src/c2/C2Agent.cpp | 82 +++++++++--- libminifi/src/c2/triggers/FileUpdateTrigger.cpp | 49 ++++++++ libminifi/test/unit/FileTriggerTests.cpp | 99 +++++++++++++++ main/MiNiFiMain.cpp | 1 - 16 files changed, 536 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/C2.md ---------------------------------------------------------------------- diff --git a/C2.md b/C2.md index b41c29c..9d9068a 100644 --- a/C2.md +++ b/C2.md @@ -25,6 +25,10 @@ options defined are located in minifi.properties. - [Configuration](#configuration) - [Base Options](#base-options) - [Metrics](#metrics) + - [Protocols](#protocols) + - [Triggers](#triggers) + - [UpdatePolicies](#updatepolicies) + - [Documentation](#documentation) ## Description @@ -41,6 +45,10 @@ will be explained in greater detail in the metrics section. For more more insight into the API used within the C2 agent, please visit: https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal +Release 0.6.0: Please note that all c2 properties now exist as nifi.c2.* . If your configuration properties +files contain the former naming convention of c2.*, we will continue to support that as +an alternate key, but you are encouraged to switch your configuration options as soon as possible. + in minifi.properties @@ -51,19 +59,19 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation # specify C2 protocol -- default is RESTSender if this is not specified - c2.agent.protocol.class=RESTSender + nifi.c2.agent.protocol.class=RESTSender # may also use MQTT - # c2.agent.protocol.class=MQTTC2Protocol + # nifi.c2.agent.protocol.class=MQTTC2Protocol # control c2 heartbeat interval in millisecocnds - c2.agent.heartbeat.period=3000 + nifi.c2.agent.heartbeat.period=3000 # enable reporter classes - c2.agent.heartbeat.reporter.class=RESTReciver + nifi.c2.agent.heartbeat.reporter.class=RESTReciver # specify the rest URIs if using RESTSender - c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat - c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge + nifi.c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat + nifi.c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge # c2 agent identifier nifi.c2.agent.identifier=<your identifier> @@ -72,7 +80,7 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal nifi.c2.agent.class=<your agent class> # configure SSL Context service for REST Protocol - c2.rest.ssl.context.service + nifi.c2.rest.ssl.context.service ### Metrics @@ -287,7 +295,7 @@ will forward responses and updates to the heartbeating agents. Remote Process Groups: [] NiFi Properties Overrides: {} -### Update Policies +### UpdatePolicies Updates to MiNiFi C++ properties can be controlled through an UpdatePolicyControllerService named C2UpdatePolicy. The service supports several configuration options. They are defined in the following example: @@ -308,7 +316,31 @@ C2UpdatePolicy. The service supports several configuration options. They are def Property_3:true Property_4:true -### Update Type Descriptions + +### Triggers + + C2 Triggers can be activated to perform some C2 activity via a local event. Currently only FileUpdateTrigger exists, which monitors + for C2 File triggers to update the flow configuration. Classes can be defined as a comma separated list of classes to load via the option + nifi.c2.agent.trigger.classes + + +#### C2 File triggers + +C2 updates can be triggered with updates to a flow configuration file. It doesn't have to be the same base configuration file. It +will be copied into place. A new property, nifi.c2.file.watch, can be placed into minifi.properties to monitor. If the update time +changes while the agent is running, it will be copied into place of the file defined by nifi.flow.configuration.file. The agent +will then be restarted with the new flow configuration. If a failure occurs in reading that file or it is an invalid YAML file, the +update process will be halted. + + in minifi.properties to activate the file update trigger specify + + # specifying a trigger + nifi.c2.agent.trigger.classes=FileUpdateTrigger + nifi.c2.file.watch=<full path of file to monitor> + + + +## Documentation Type descriptions ( class descriptions entered in PROCESSORS.md ) can be automatically placed within C2 by building cmake with the following flag: @@ -320,6 +352,4 @@ the following flag: When cmake is instantiated with this, a build will re-generate the type descriptions from PROCESSORS.md. Once this is finished you may re-build the project with the following command from the build directory, running the build as you normally would: - cmake -DBOOTSTRAP= .. - - \ No newline at end of file + cmake -DBOOTSTRAP= .. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/extensions/http-curl/protocols/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp index d2c17b6..babc983 100644 --- a/extensions/http-curl/protocols/RESTReceiver.cpp +++ b/extensions/http-curl/protocols/RESTReceiver.cpp @@ -49,8 +49,8 @@ void RESTReceiver::initialize(const std::shared_ptr<core::controller::Controller logger_->log_trace("Initializing rest receiver"); if (nullptr != configuration_) { std::string listeningPort,rootUri="/", caCert; - configuration_->get("c2.rest.listener.port", listeningPort); - configuration_->get("c2.rest.listener.cacert", caCert); + configuration_->get("nifi.c2.rest.listener.port","c2.rest.listener.port", listeningPort); + configuration_->get("nifi.c2.rest.listener.cacert","c2.rest.listener.cacert", caCert); if (!listeningPort.empty() && !rootUri.empty()) { handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol()); if (!caCert.empty()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/extensions/http-curl/protocols/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index 7db998a..9b4ce5e 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -46,15 +46,15 @@ void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerSe // base URL when one is not specified. if (nullptr != configure) { std::string update_str, ssl_context_service_str; - configure->get("c2.rest.url", rest_uri_); - configure->get("c2.rest.url.ack", ack_uri_); - if (configure->get("c2.rest.ssl.context.service", ssl_context_service_str)) { + configure->get("nifi.c2.rest.url","c2.rest.url", rest_uri_); + configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", ack_uri_); + if (configure->get("nifi.c2.rest.ssl.context.service","c2.rest.ssl.context.service", ssl_context_service_str)) { auto service = controller->getControllerService(ssl_context_service_str); if (nullptr != service) { ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); } } - configure->get("c2.rest.heartbeat.minimize.updates", update_str); + configure->get("nifi.c2.rest.heartbeat.minimize.updates","c2.rest.heartbeat.minimize.updates", update_str); utils::StringUtils::StringToBool(update_str, minimize_updates_); } logger_->log_debug("Submitting to %s", rest_uri_); @@ -78,8 +78,8 @@ C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction directi void RESTSender::update(const std::shared_ptr<Configure> &configure) { std::string url; - configure->get("c2.rest.url", url); - configure->get("c2.rest.url.ack", url); + configure->get("nifi.c2.rest.url","c2.rest.url", url); + configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", url); } const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 1f556d0..11eec1c 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -97,7 +97,7 @@ find_package(OpenSSL) if (OPENSSL_FOUND) set(TLS_SOURCES "src/io/tls/*.cpp") endif(OPENSSL_FOUND) -file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") +file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" ) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/C2Agent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index e9ff4e4..cc575a2 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -30,6 +30,7 @@ #include "controllers/UpdatePolicyControllerService.h" #include "core/state/Value.h" #include "C2Payload.h" +#include "C2Trigger.h" #include "C2Protocol.h" #include "io/validation.h" #include "HeartBeatReporter.h" @@ -90,11 +91,23 @@ class C2Agent : public state::UpdateController, public state::response::Response protected: + /** + * Restarts this agent. + */ void restart_agent(); + /** + * Update agent per the provided C2 update from c2 server or triggers + */ void update_agent(); /** + * Check the collection of triggers for any updates that need to be handled. + * This is an optional step + */ + void checkTriggers(); + + /** * Configure the C2 agent */ void configure(const std::shared_ptr<Configure> &configure, bool reconfigure = true); @@ -212,6 +225,8 @@ class C2Agent : public state::UpdateController, public state::response::Response std::vector<std::shared_ptr<HeartBeatReporter>> heartbeat_protocols_; + std::vector<std::shared_ptr<C2Trigger>> triggers_; + std::atomic<C2Protocol*> protocol_; bool allow_updates_; @@ -223,8 +238,7 @@ class C2Agent : public state::UpdateController, public state::response::Response std::string bin_location_; std::shared_ptr<logging::Logger> logger_; -} -; +}; } /* namesapce c2 */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/C2Trigger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Trigger.h b/libminifi/include/c2/C2Trigger.h new file mode 100644 index 0000000..87f33f9 --- /dev/null +++ b/libminifi/include/c2/C2Trigger.h @@ -0,0 +1,82 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ +#define LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ + +#include "core/Connectable.h" +#include "c2/C2Payload.h" +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose: Defines basic triggering mechanism for command and control interfaces + * + * Design: Extends Connectable so that we can instantiate with the class name + * + * The state machine expects triggered (yes ) -> getAction -> reset(optional) + */ +class C2Trigger : public core::Connectable{ + public: + + C2Trigger(std::string name, utils::Identifier uuid) + : core::Connectable(name, uuid){ + + } + virtual ~C2Trigger() { + } + + + /** + * initializes trigger with minifi configuration. + */ + virtual void initialize(const std::shared_ptr<minifi::Configure> &configuration) = 0; + /** + * returns true if triggered, false otherwise. calling this function multiple times + * may change internal state. + */ + virtual bool triggered() = 0; + + /** + * Resets actions once they have been triggered. The flow of events does not require + * this to occur after an action has been triggered. Instead this is optional + * and a feature available to potential triggers that require a reset. + * + * This will occur because there are times in which the C2Action may take a significant + * amount of time and a reset is in order to avoid continual triggering. + */ + virtual void reset() = 0; + + /** + * Returns a payload implementing a C2 action. May or may not reset the action. + * @return C2Payload of the action to perform. + */ + virtual C2Payload getAction() = 0; +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/c2/triggers/FileUpdateTrigger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h new file mode 100644 index 0000000..031a245 --- /dev/null +++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h @@ -0,0 +1,126 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ +#define LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ +#include <atomic> +#include "c2/C2Trigger.h" +#include "utils/StringUtils.h" +#include "utils/file/FileUtils.h" +#include "core/Resource.h" +#include "c2/C2Payload.h" +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose: Defines a file update trigger when the last write time of a file has been changed. + * Design: Extends C2Trigger, and implements a trigger, action, reset state machine. Calling + * triggered will check the file. + */ +class FileUpdateTrigger : public C2Trigger { + public: + + FileUpdateTrigger(std::string name, utils::Identifier uuid = utils::Identifier()) + : C2Trigger(name, uuid), + last_update_(0), + update_(false), + logger_(logging::LoggerFactory<FileUpdateTrigger>::getLogger()) { + } + + void initialize(const std::shared_ptr<minifi::Configure> &configuration) { + if (nullptr != configuration) { + if (configuration->get(minifi::Configure::nifi_c2_file_watch, "c2.file.watch", file_)) { + last_update_ = utils::file::FileUtils::last_write_time(file_); + } else { + logger_->log_trace("Could not configure file"); + } + + } + } + + virtual bool triggered() { + if (last_update_ == 0) { + logger_->log_trace("Last Update is zero"); + return false; + } + auto update_time = utils::file::FileUtils::last_write_time(file_); + logger_->log_trace("Last Update is %d and update time is %d", last_update_.load(), update_time); + if (update_time > last_update_) { + last_update_ = update_time; + update_ = true; + return true; + } + return false; + } + + virtual void reset() { + // reset the last write time + last_update_ = utils::file::FileUtils::last_write_time(file_); + update_ = false; + } + + /** + * Returns an update payload implementing a C2 action + */ + virtual C2Payload getAction(); + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Block until work is available on any input connection, or the given duration elapses + * @param timeoutMs timeout in milliseconds + */ + + virtual void yield() { + + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + + protected: + std::string file_; + std::atomic<uint64_t> last_update_; + std::atomic<bool> update_; + private: + std::shared_ptr<logging::Logger> logger_; +}; +// add the trigger to the known resources. +REGISTER_RESOURCE(FileUpdateTrigger) + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 872c35d..4fb68dc 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -41,7 +41,6 @@ class Configure : public Properties { } // nifi.flow.configuration.file static const char *nifi_default_directory; - static const char *nifi_c2_enable; static const char *nifi_flow_configuration_file; static const char *nifi_flow_configuration_file_backup_update; static const char *nifi_flow_engine_threads; @@ -78,6 +77,10 @@ class Configure : public Properties { // nifi rest api user name and password static const char *nifi_rest_api_user_name; static const char *nifi_rest_api_password; + // c2 options + + static const char *nifi_c2_enable; + static const char *nifi_c2_file_watch; private: std::string agent_identifier_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/include/properties/Properties.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index ec0ca5d..eadb77d 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -57,8 +57,24 @@ class Properties { std::lock_guard<std::mutex> lock(mutex_); return (properties_.find(key) != properties_.end()); } - // Get the config value - bool get(std::string key, std::string &value); + /** + * Returns the config value by placing it into the referenced param value + * @param key key to look up + * @param value value in which to place the map's stored property value + * @returns true if found, false otherwise. + */ + bool get(const std::string &key, std::string &value); + + /** + * Returns the config value by placing it into the referenced param value + * Uses alternate_key if key is not found within the map. + * + * @param key key to look up + * @param alternate_key is the secondary lookup key if key is not found + * @param value value in which to place the map's stored property value + * @returns true if found, false otherwise. + */ + bool get(const std::string &key, const std::string &alternate_key, std::string &value); /** * Returns the configuration value or an empty string. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 21cce95..4bcf315 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -55,6 +55,7 @@ const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client. const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name"; const char *Configure::nifi_rest_api_password = "nifi.rest.api.password"; +const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch"; } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index c840f14..9206f41 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -54,7 +54,6 @@ #include "core/Connectable.h" #include "utils/HTTPClient.h" - #ifdef _MSC_VER #ifndef PATH_MAX #define PATH_MAX 260 @@ -189,7 +188,7 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st this->root_ = std::move(newRoot); loadFlowRepo(); initialized_ = true; - bool started = start(); + bool started = start() == 0; updating_ = false; @@ -358,7 +357,7 @@ void FlowController::initializeC2() { std::string c2_enable_str; - if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) { + if (configuration_->get(Configure::nifi_c2_enable, "c2.enable", c2_enable_str)) { bool enable_c2 = true; utils::StringUtils::StringToBool(c2_enable_str, enable_c2); c2_enabled_ = enable_c2; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/Properties.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index f5537fc..e64a92f 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -33,7 +33,7 @@ Properties::Properties() } // Get the config value -bool Properties::get(std::string key, std::string &value) { +bool Properties::get(const std::string &key, std::string &value) { std::lock_guard<std::mutex> lock(mutex_); auto it = properties_.find(key); @@ -45,6 +45,25 @@ bool Properties::get(std::string key, std::string &value) { } } +bool Properties::get(const std::string &key, const std::string &alternate_key, std::string &value) { + std::lock_guard<std::mutex> lock(mutex_); + auto it = properties_.find(key); + + if (it == properties_.end()) { + it = properties_.find(alternate_key); + if (it != properties_.end()) { + logger_->log_warn("%s is an alternate property that may not be supported in future releases. Please use %s instead.", alternate_key, key); + } + } + + if (it != properties_.end()) { + value = it->second; + return true; + } else { + return false; + } +} + int Properties::getInt(const std::string &key, int default_value) { std::lock_guard<std::mutex> lock(mutex_); auto it = properties_.find(key); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 10d8d29..8db6894 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -62,7 +62,7 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count(); // place priority on messages to send to the c2 server - if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { + if ( protocol_ != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { if (requests.size() > 0) { int count = 0; do { @@ -80,6 +80,8 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid performHeartBeat(); } + checkTriggers(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false)); }; @@ -102,11 +104,31 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid functions_.push_back(c2_consumer_); } +void C2Agent::checkTriggers() { + logger_->log_info("Checking %d triggers", triggers_.size()); + for (const auto &trigger : triggers_) { + if (trigger->triggered()) { + /** + * Action was triggered, so extract it. + */ + C2Payload &&triggerAction = trigger->getAction(); + logger_->log_trace("%s action triggered", trigger->getName()); + // handle the response the same way. This means that + // acknowledgements will be sent to the c2 server for every trigger action. + // this is expected + extractPayload(std::move(triggerAction)); + // call reset if the trigger supports this activity + trigger->reset(); + } else { + logger_->log_trace("%s action not triggered", trigger->getName()); + } + } +} void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconfigure) { std::string clazz, heartbeat_period, device; if (!reconfigure) { - if (!configure->get("c2.agent.protocol.class", clazz)) { + if (!configure->get("nifi.c2.agent.protocol.class", "c2.agent.protocol.class", clazz)) { clazz = "RESTSender"; } logger_->log_info("Class is %s", clazz); @@ -132,7 +154,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf protocol_.load()->update(configure); } - if (configure->get("c2.agent.heartbeat.period", heartbeat_period)) { + if (configure->get("nifi.c2.agent.heartbeat.period", "c2.agent.heartbeat.period", heartbeat_period)) { try { heart_beat_period_ = std::stoi(heartbeat_period); } catch (const std::invalid_argument &ie) { @@ -144,12 +166,12 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf } std::string update_settings; - if (configure->get("c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { + if (configure->get("nifi.c2.agent.update.allow", "c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { // allow the agent to be updated. we then need to get an update command to execute after } if (allow_updates_) { - if (!configure->get("c2.agent.update.command", update_command_)) { + if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) { char cwd[1024]; getcwd(cwd, sizeof(cwd)); @@ -158,7 +180,7 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf update_command_ = command.str(); } - if (!configure->get("c2.agent.update.temp.location", update_location_)) { + if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) { char cwd[1024]; getcwd(cwd, sizeof(cwd)); @@ -169,10 +191,10 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf } // if not defined we won't beable to update - configure->get("c2.agent.bin.location", bin_location_); + configure->get("nifi.c2.agent.bin.location", "c2.agent.bin.location", bin_location_); } std::string heartbeat_reporters; - if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { + if (configure->get("nifi.c2.agent.heartbeat.reporter.classes", "c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { std::vector<std::string> reporters = utils::StringUtils::split(heartbeat_reporters, ","); std::lock_guard<std::mutex> lock(heartbeat_mutex); for (auto reporter : reporters) { @@ -187,6 +209,22 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf } } + std::string trigger_classes; + if (configure->get("nifi.c2.agent.trigger.classes", "c2.agent.trigger.classes", trigger_classes)) { + std::vector<std::string> triggers = utils::StringUtils::split(trigger_classes, ","); + std::lock_guard<std::mutex> lock(heartbeat_mutex); + for (auto trigger : triggers) { + auto trigger_obj = core::ClassLoader::getDefaultClassLoader().instantiate(trigger, trigger); + if (trigger_obj == nullptr) { + logger_->log_debug("Could not instantiate %s", trigger); + } else { + std::shared_ptr<C2Trigger> trg_impl = std::static_pointer_cast<C2Trigger>(trigger_obj); + trg_impl->initialize(configuration_); + triggers_.push_back(trg_impl); + } + } + } + auto base_reporter = "ControllerSocketProtocol"; auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter); if (heartbeat_reporter_obj == nullptr) { @@ -514,23 +552,38 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { // just get the raw data. C2Payload payload(Operation::TRANSFER, false, true); - C2Payload &&response = protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, false); + auto urlStr = url->second.to_string(); - auto raw_data = response.getRawData(); - std::string file_path = std::string(raw_data.data(), raw_data.size()); + std::string file_path = urlStr; + if (nullptr != protocol_ && file_path.find("http") != std::string::npos) { + C2Payload &&response = protocol_.load()->consumePayload(urlStr, payload, RECEIVE, false); + + auto raw_data = response.getRawData(); + file_path = std::string(raw_data.data(), raw_data.size()); + } std::ifstream new_conf(file_path); std::string raw_data_str((std::istreambuf_iterator<char>(new_conf)), std::istreambuf_iterator<char>()); unlink(file_path.c_str()); // if we can apply the update, we will acknowledge it and then backup the configuration file. - if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) { + if (update_sink_->applyUpdate(urlStr, raw_data_str)) { C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) { // update nifi.flow.configuration.file=./conf/config.yml std::string config_file; + configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file); + std::string adjustedFilename; + if (config_file[0] != '/') { + adjustedFilename = adjustedFilename + configuration_->getHome() + "/" + config_file; + } else { + adjustedFilename += config_file; + } + + config_file = adjustedFilename; + std::stringstream config_file_backup; config_file_backup << config_file << ".bak"; // we must be able to successfuly copy the file. @@ -540,14 +593,15 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) { if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) { + logger_->log_debug("Cannot copy %s to %s", config_file, config_file_backup.str()); persist_config = false; } } + logger_->log_debug("Copy %s to %s %d", config_file, config_file_backup.str(), persist_config); if (persist_config) { std::ofstream writer(config_file); if (writer.is_open()) { - auto output = response.getRawData(); - writer.write(output.data(), output.size()); + writer.write(raw_data_str.data(), raw_data_str.size()); } writer.close(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/src/c2/triggers/FileUpdateTrigger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp new file mode 100644 index 0000000..84cf0d3 --- /dev/null +++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp @@ -0,0 +1,49 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/triggers/FileUpdateTrigger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Returns a payload implementing a C2 action + */ +C2Payload FileUpdateTrigger::getAction() { + if (update_) { + C2Payload response_payload(Operation::UPDATE, state::UpdateState::READ_COMPLETE, true, true); + C2ContentResponse resp(Operation::UPDATE); + resp.ident = "triggered"; + resp.name = "configuration"; + resp.operation_arguments["location"] = file_; + resp.operation_arguments["persist"] = "true"; + response_payload.addContent(std::move(resp)); + update_ = false; + return response_payload; + } + C2Payload response_payload(Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true, true); + return response_payload; +} +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/libminifi/test/unit/FileTriggerTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/FileTriggerTests.cpp b/libminifi/test/unit/FileTriggerTests.cpp new file mode 100644 index 0000000..c05327c --- /dev/null +++ b/libminifi/test/unit/FileTriggerTests.cpp @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <chrono> +#include <thread> +#include <uuid/uuid.h> +#include <memory> + +#include "c2/triggers/FileUpdateTrigger.h" +#include "../TestBase.h" +#include "io/ClientSocket.h" +#include "core/Processor.h" +#include "core/ClassLoader.h" +#include "core/yaml/YamlConfiguration.h" + +TEST_CASE("Empty file", "[t1]") { + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("invalidfile file", "[t2]") { + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + configuration->set(minifi::Configure::nifi_c2_file_watch, "/tmp/blahblahblhalbha"); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("test valid file no update", "[t3]") { + TestController testController; + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + configuration->set(minifi::Configure::nifi_c2_file_watch, path); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("test valid file update", "[t4]") { + TestController testController; + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + configuration->set(minifi::Configure::nifi_c2_file_watch, path); + trigger.initialize(configuration); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + file.open(path, std::ios::out); + file << "tempFiles"; + file.close(); + + REQUIRE(true == trigger.triggered()); + + REQUIRE(minifi::c2::Operation::UPDATE == trigger.getAction().getOperation()); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed8221b1/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index a99c219..23c7e70 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -36,7 +36,6 @@ #include <vector> #include <queue> #include <map> -#include <yaml-cpp/yaml.h> #include <iostream> #include "ResourceClaim.h" #include "core/Core.h"
