This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2346489 MINIFICPP-1439 - Startup without agent class, handle class
update
2346489 is described below
commit 2346489446aff2868e2fd39f6a8e3585624f6a3e
Author: Adam Debreceni <[email protected]>
AuthorDate: Fri Jan 8 16:39:58 2021 +0100
MINIFICPP-1439 - Startup without agent class, handle class update
Signed-off-by: Arpad Boda <[email protected]>
This closes #969
---
extensions/coap/tests/CoapIntegrationBase.h | 8 +-
extensions/http-curl/tests/C2RequestClassTest.cpp | 139 +++++++++++++++++++++
extensions/http-curl/tests/CMakeLists.txt | 1 +
extensions/http-curl/tests/HTTPIntegrationBase.h | 38 ++++--
.../http-curl/tests/VerifyInvokeHTTPTest.cpp | 23 +++-
libminifi/include/c2/C2Agent.h | 2 +-
libminifi/include/c2/C2Payload.h | 17 ++-
.../AgentIdentificationProvider.h} | 33 +++--
.../include/core/state/nodes/AgentInformation.h | 29 +++--
libminifi/include/properties/Configuration.h | 26 +---
libminifi/include/properties/Configure.h | 9 +-
libminifi/src/Configure.cpp | 22 ++++
libminifi/src/FlowController.cpp | 3 +-
libminifi/src/c2/C2Agent.cpp | 30 +++--
libminifi/src/c2/C2Client.cpp | 19 +--
libminifi/src/c2/protocols/RESTProtocol.cpp | 101 ++++++++-------
libminifi/test/integration/IntegrationBase.h | 10 +-
17 files changed, 349 insertions(+), 161 deletions(-)
diff --git a/extensions/coap/tests/CoapIntegrationBase.h
b/extensions/coap/tests/CoapIntegrationBase.h
index 6dce8d5..6ccee57 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -44,13 +44,15 @@ class CoapIntegrationBase : public IntegrationBase {
server.reset();
}
- void run(const std::string& test_file_location, const
utils::optional<std::string>& bootstrap_file = {}) override {
+ void run(const utils::optional<std::string>& test_file_location = {}, const
utils::optional<std::string>& bootstrap_file = {}) override {
testSetup();
std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
- configuration->set(minifi::Configure::nifi_flow_configuration_file,
test_file_location);
+ if (test_file_location) {
+ configuration->set(minifi::Configure::nifi_flow_configuration_file,
*test_file_location);
+ }
configuration->set("c2.agent.heartbeat.period", "200");
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -87,7 +89,7 @@ class CoapIntegrationBase : public IntegrationBase {
};
void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
-
+ std::string path;
parse_http_components(url, port, scheme, path);
CivetCallbacks callback{};
if (url.find("localhost") != std::string::npos) {
diff --git a/extensions/http-curl/tests/C2RequestClassTest.cpp
b/extensions/http-curl/tests/C2RequestClassTest.cpp
new file mode 100644
index 0000000..7521eb0
--- /dev/null
+++ b/extensions/http-curl/tests/C2RequestClassTest.cpp
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <vector>
+#include <string>
+
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
+#include "CivetStream.h"
+#include "StreamPipe.h"
+#include "OptionalUtils.h"
+
+class C2AcknowledgeHandler : public ServerAwareHandler {
+ public:
+ bool handlePost(CivetServer *server, struct mg_connection *conn) override {
+ std::string req = readPayload(conn);
+ rapidjson::Document root;
+ root.Parse(req.data(), req.size());
+ if (root.HasMember("operationId")) {
+ std::lock_guard<std::mutex> guard(mtx_);
+ acknowledged_operations_.insert(root["operationId"].GetString());
+ }
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: 0\r\nConnection:
close\r\n\r\n");
+ return true;
+ }
+
+ bool isAcknowledged(const std::string& operation_id) const {
+ std::lock_guard<std::mutex> guard(mtx_);
+ return acknowledged_operations_.count(operation_id) > 0;
+ }
+
+ private:
+ mutable std::mutex mtx_;
+ std::set<std::string> acknowledged_operations_;
+};
+
+class C2HeartbeatHandler : public ServerAwareHandler {
+ public:
+ explicit C2HeartbeatHandler(std::string response) :
response_(std::move(response)) {}
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) override {
+ std::string req = readPayload(conn);
+ rapidjson::Document root;
+ root.Parse(req.data(), req.size());
+ utils::optional<std::string> agent_class;
+ if (root["agentInfo"].HasMember("agentClass")) {
+ agent_class = root["agentInfo"]["agentClass"].GetString();
+ }
+ {
+ std::lock_guard<std::mutex> lock(mtx_);
+ classes_.push_back(agent_class);
+ }
+
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection:
close\r\n\r\n",
+ response_.length());
+ mg_printf(conn, "%s", response_.c_str());
+ return true;
+ }
+
+ bool gotClassesInOrder(const std::vector<utils::optional<std::string>>&
class_names) {
+ std::lock_guard<std::mutex> lock(mtx_);
+ auto it = classes_.begin();
+ for (const auto& class_name : class_names) {
+ it = std::find(classes_.begin(), classes_.end(), class_name);
+ if (it == classes_.end()) {
+ return false;
+ }
+ ++it;
+ }
+ return true;
+ }
+
+ private:
+ std::mutex mtx_;
+ std::vector<utils::optional<std::string>> classes_;
+ std::string response_;
+};
+
+class VerifyC2ClassRequest : public VerifyC2Base {
+ public:
+ explicit VerifyC2ClassRequest(std::function<bool()> verify) :
verify_(std::move(verify)) {}
+
+ void configureC2() override {
+ configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+ configuration->set("nifi.c2.enable", "true");
+ configuration->set("nifi.c2.agent.heartbeat.period", "100");
+ configuration->set("nifi.c2.root.classes",
"DeviceInfoNode,AgentInformation,FlowInformation");
+ }
+
+ void runAssertions() override {
+ assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(3),
verify_));
+ }
+
+ private:
+ std::function<bool()> verify_;
+};
+
+int main() {
+ const std::string class_update_id = "321";
+ C2HeartbeatHandler heartbeat_handler(R"({
+ "requested_operations": [{
+ "operation": "update",
+ "name": "properties",
+ "operationId": ")" + class_update_id + R"(",
+ "args": {
+ "nifi.c2.agent.class": {"value": "TestClass", "persist": true}
+ }
+ }]})");
+ C2AcknowledgeHandler ack_handler;
+
+ VerifyC2ClassRequest harness([&]() -> bool {
+ return heartbeat_handler.gotClassesInOrder({{}, {"TestClass"}}) &&
+ ack_handler.isAcknowledged(class_update_id);
+ });
+ harness.setUrl("http://localhost:0/api/heartbeat", &heartbeat_handler);
+ harness.setUrl("http://localhost:0/api/acknowledge", &ack_handler);
+ harness.setC2Url("/api/heartbeat", "/api/acknowledge");
+
+ harness.run();
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt
b/extensions/http-curl/tests/CMakeLists.txt
index 2378862..04a9627 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -78,6 +78,7 @@ add_test(NAME C2DescribeCoreComponentStateTest COMMAND
C2DescribeCoreComponentSt
add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest
"${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest
"${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/"
"${TEST_RESOURCES}/TestBad.yml")
add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration
"${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2RequestClassTest COMMAND C2RequestClassTest)
if(NOT OPENSSL_OFF)
add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest
"${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/")
add_test(NAME C2VerifyHeartbeatAndStopSecure COMMAND C2VerifyHeartbeatAndStop
"${TEST_RESOURCES}/C2VerifyHeartbeatAndStopSecure.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h
b/extensions/http-curl/tests/HTTPIntegrationBase.h
index cbdbdd3..48724bb 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -43,7 +43,9 @@ public:
server(nullptr) {
}
- void setUrl(const std::string &url, ServerAwareHandler *handler);
+ virtual void setUrl(const std::string &url, ServerAwareHandler *handler);
+
+ void setC2Url(const std::string& heartbeat_path, const std::string&
acknowledge_path);
void shutdownBeforeFlowController() override {
server.reset();
@@ -68,21 +70,31 @@ public:
};
void HTTPIntegrationBase::setUrl(const std::string &url, ServerAwareHandler
*handler) {
- parse_http_components(url, port, scheme, path);
- CivetCallbacks callback{};
+ std::string url_port, url_scheme, url_path;
+ parse_http_components(url, url_port, url_scheme, url_path);
if (server) {
- server->addHandler(path, handler);
+ if (url_port != "0" && url_port != port) {
+ throw std::logic_error("Inconsistent port requirements");
+ }
+ if (url_scheme != scheme) {
+ throw std::logic_error("Inconsistent scheme requirements");
+ }
+ server->addHandler(url_path, handler);
return;
}
+ // initialize server
+ scheme = url_scheme;
+ port = url_port;
+ CivetCallbacks callback{};
if (scheme == "https" && !key_dir.empty()) {
std::string cert = key_dir + "nifi-cert.pem";
memset(&callback, 0, sizeof(callback));
callback.init_ssl = ssl_enable;
port += "s";
callback.log_message = log_message;
- server = utils::make_unique<TestServer>(port, path, handler, &callback,
cert, cert);
+ server = utils::make_unique<TestServer>(port, url_path, handler,
&callback, cert, cert);
} else {
- server = utils::make_unique<TestServer>(port, path, handler);
+ server = utils::make_unique<TestServer>(port, url_path, handler);
}
bool secure{false};
if (port == "0" || port == "0s") {
@@ -92,13 +104,23 @@ void HTTPIntegrationBase::setUrl(const std::string &url,
ServerAwareHandler *han
port += "s";
}
}
- std::string c2_url = std::string("http") + (secure ? "s" : "") +
"://localhost:" + getWebPort() + path;
+ std::string c2_url = std::string("http") + (secure ? "s" : "") +
"://localhost:" + getWebPort() + url_path;
configuration->set("nifi.c2.rest.url", c2_url);
configuration->set("nifi.c2.rest.url.ack", c2_url);
}
+void HTTPIntegrationBase::setC2Url(const std::string &heartbeat_path, const
std::string &acknowledge_path) {
+ if (port.empty()) {
+ throw std::logic_error("Port is not yet initialized");
+ }
+ bool secure = port.back() == 's';
+ std::string base = std::string("http") + (secure ? "s" : "") +
"://localhost:" + getWebPort();
+ configuration->set("nifi.c2.rest.url", base + heartbeat_path);
+ configuration->set("nifi.c2.rest.url.ack", base + acknowledge_path);
+}
+
class VerifyC2Base : public HTTPIntegrationBase {
-public:
+ public:
void testSetup() override {
LogTestController::getInstance().setDebug<utils::HTTPClient>();
LogTestController::getInstance().setDebug<LogTestController>();
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 42b1b9c..d6422e3 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -44,18 +44,30 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
void cleanup() override {
}
+ void setUrl(const std::string &url, ServerAwareHandler *handler) override {
+ if (path_) {
+ throw std::logic_error("Url is already set");
+ }
+ std::string port, scheme, path;
+ parse_http_components(url, port, scheme, path);
+ path_ = path;
+ HTTPIntegrationBase::setUrl(url, handler);
+ }
+
void setProperties(std::shared_ptr<core::Processor> proc) {
- std::string url = scheme + "://localhost:" + getWebPort() + path;
+ std::string url = scheme + "://localhost:" + getWebPort() + *path_;
proc->setProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
}
- void setupFlow(const std::string& flow_yml_path) {
+ void setupFlow(const utils::optional<std::string>& flow_yml_path) {
testSetup();
std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
- configuration->set(minifi::Configure::nifi_flow_configuration_file,
flow_yml_path);
+ if (flow_yml_path) {
+ configuration->set(minifi::Configure::nifi_flow_configuration_file,
*flow_yml_path);
+ }
configuration->set("c2.agent.heartbeat.period", "200");
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
@@ -76,7 +88,7 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
setProperties(processorController->getProcessor());
}
- void run(const std::string& flow_yml_path, const
utils::optional<std::string>& bootstrap_file = {}) override {
+ void run(const utils::optional<std::string>& flow_yml_path = {}, const
utils::optional<std::string>& bootstrap_file = {}) override {
setupFlow(flow_yml_path);
startFlowController();
@@ -96,6 +108,9 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
cleanup();
}
+
+ private:
+ utils::optional<std::string> path_;
};
class VerifyInvokeHTTPOKResponse : public VerifyInvokeHTTP {
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 6333635..4e61300 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -148,7 +148,7 @@ class C2Agent : public state::UpdateController {
/**
* Updates a property
*/
- bool update_property(const std::string &property_name, const std::string
&property_value, bool persist = false);
+ bool update_property(const std::string &property_name, const std::string
&property_value, bool persist);
/**
* Creates configuration options C2 payload for response
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index fe81da3..4be8ae6 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -56,6 +56,21 @@ enum Direction {
RECEIVE
};
+struct AnnotatedValue : state::response::ValueNode {
+ using state::response::ValueNode::ValueNode;
+ using state::response::ValueNode::operator=;
+
+ utils::optional<std::reference_wrapper<const AnnotatedValue>>
getAnnotation(const std::string& name) const {
+ auto it = annotations.find(name);
+ if (it == annotations.end()) {
+ return {};
+ }
+ return std::cref(it->second);
+ }
+
+ std::map<std::string, AnnotatedValue> annotations;
+};
+
struct C2ContentResponse {
explicit C2ContentResponse(Operation op)
:op{ op }
@@ -85,7 +100,7 @@ struct C2ContentResponse {
// name applied to commands
std::string name;
// commands that correspond with the operation.
- std::map<std::string, state::response::ValueNode> operation_arguments;
+ std::map<std::string, AnnotatedValue> operation_arguments;
};
/**
diff --git a/libminifi/include/properties/Configure.h
b/libminifi/include/core/AgentIdentificationProvider.h
similarity index 58%
copy from libminifi/include/properties/Configure.h
copy to libminifi/include/core/AgentIdentificationProvider.h
index 8b26bef..40f77f4 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/core/AgentIdentificationProvider.h
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,36 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#pragma once
#include <string>
-#include <utility>
-
-#include "properties/Configuration.h"
-#include "properties/Decryptor.h"
#include "utils/OptionalUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
+namespace core {
-class Configure : public Configuration {
+/**
+ * Provides a single source of truth for the agent's class and identifier.
+ */
+class AgentIdentificationProvider {
public:
- explicit Configure(utils::optional<Decryptor> decryptor = utils::nullopt)
- : Configuration{}, decryptor_(std::move(decryptor)) {}
-
- bool get(const std::string& key, std::string& value) const;
- bool get(const std::string& key, const std::string& alternate_key,
std::string& value) const;
- utils::optional<std::string> get(const std::string& key) const;
+ virtual utils::optional<std::string> getAgentClass() const = 0;
- private:
- bool isEncrypted(const std::string& key) const;
+ virtual std::string getAgentIdentifier() const = 0;
- utils::optional<Decryptor> decryptor_;
+ virtual ~AgentIdentificationProvider() = default;
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h
b/libminifi/include/core/state/nodes/AgentInformation.h
index 14ab127..63b1e36 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -60,6 +60,8 @@
#include "core/state/nodes/StateMonitor.h"
#include "io/ClientSocket.h"
#include "SchedulingNodes.h"
+#include "utils/OptionalUtils.h"
+#include "core/AgentIdentificationProvider.h"
namespace org {
namespace apache {
@@ -502,12 +504,8 @@ class AgentIdentifier {
: include_agent_manifest_(true) {
}
- void setIdentifier(const std::string &identifier) {
- identifier_ = identifier;
- }
-
- void setAgentClass(const std::string &agentClass) {
- agent_class_ = agentClass;
+ void
setAgentIdentificationProvider(std::shared_ptr<core::AgentIdentificationProvider>
provider) {
+ provider_ = std::move(provider);
}
void includeAgentManifest(bool include) {
@@ -515,8 +513,7 @@ class AgentIdentifier {
}
protected:
- std::string identifier_;
- std::string agent_class_;
+ std::shared_ptr<core::AgentIdentificationProvider> provider_;
bool include_agent_manifest_;
};
@@ -644,14 +641,16 @@ class AgentNode : public DeviceInformation, public
AgentMonitor, public AgentIde
SerializedResponseNode ident;
ident.name = "identifier";
- ident.value = identifier_;
-
- SerializedResponseNode agentClass;
- agentClass.name = "agentClass";
- agentClass.value = agent_class_;
-
+ ident.value = provider_->getAgentIdentifier();
serialized.push_back(ident);
- serialized.push_back(agentClass);
+
+ utils::optional<std::string> agent_class = provider_->getAgentClass();
+ if (agent_class) {
+ SerializedResponseNode agentClass;
+ agentClass.name = "agentClass";
+ agentClass.value = *agent_class;
+ serialized.push_back(agentClass);
+ }
return serialized;
}
diff --git a/libminifi/include/properties/Configuration.h
b/libminifi/include/properties/Configuration.h
index a0655ae..39ecaaa 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -20,35 +20,18 @@
#include <string>
#include <mutex>
#include "properties/Properties.h"
+#include "utils/OptionalUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
+// TODO(adebreceni): eliminate this class in a separate PR
class Configuration : public Properties {
public:
Configuration() : Properties("MiNiFi configuration") {}
- void setAgentIdentifier(const std::string &identifier) {
- std::lock_guard<std::mutex> lock(mutex_);
- agent_identifier_ = identifier;
- }
- std::string getAgentIdentifier() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return agent_identifier_;
- }
-
- void setAgentClass(const std::string& agentClass) {
- std::lock_guard<std::mutex> lock(mutex_);
- agent_class_ = agentClass;
- }
-
- std::string getAgentClass() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return agent_class_;
- }
-
// nifi.flow.configuration.file
static constexpr const char *nifi_default_directory =
"nifi.default.directory";
static constexpr const char *nifi_flow_configuration_file =
"nifi.flow.configuration.file";
@@ -115,11 +98,6 @@ class Configuration : public Properties {
static constexpr const char *minifi_disk_space_watchdog_interval =
"minifi.disk.space.watchdog.interval";
static constexpr const char *minifi_disk_space_watchdog_stop_threshold =
"minifi.disk.space.watchdog.stop.threshold";
static constexpr const char *minifi_disk_space_watchdog_restart_threshold =
"minifi.disk.space.watchdog.restart.threshold";
-
- private:
- std::string agent_identifier_;
- std::string agent_class_;
- mutable std::mutex mutex_;
};
} // namespace minifi
diff --git a/libminifi/include/properties/Configure.h
b/libminifi/include/properties/Configure.h
index 8b26bef..7073681 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -22,13 +22,14 @@
#include "properties/Configuration.h"
#include "properties/Decryptor.h"
#include "utils/OptionalUtils.h"
+#include "core/AgentIdentificationProvider.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
-class Configure : public Configuration {
+class Configure : public Configuration, public
core::AgentIdentificationProvider {
public:
explicit Configure(utils::optional<Decryptor> decryptor = utils::nullopt)
: Configuration{}, decryptor_(std::move(decryptor)) {}
@@ -37,10 +38,16 @@ class Configure : public Configuration {
bool get(const std::string& key, const std::string& alternate_key,
std::string& value) const;
utils::optional<std::string> get(const std::string& key) const;
+ utils::optional<std::string> getAgentClass() const override;
+ std::string getAgentIdentifier() const override;
+ void setFallbackAgentIdentifier(const std::string& id);
+
private:
bool isEncrypted(const std::string& key) const;
utils::optional<Decryptor> decryptor_;
+ mutable std::mutex fallback_identifier_mutex_;
+ std::string fallback_identifier_;
};
} // namespace minifi
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 28de0c8..a815a81 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -62,6 +62,28 @@ bool Configure::isEncrypted(const std::string& key) const {
return decryptor_->isValidEncryptionMarker(encryption_marker);
}
+utils::optional<std::string> Configure::getAgentClass() const {
+ std::string agent_class;
+ if (get("nifi.c2.agent.class", "c2.agent.class", agent_class) &&
!agent_class.empty()) {
+ return agent_class;
+ }
+ return {};
+}
+
+std::string Configure::getAgentIdentifier() const {
+ std::string agent_id;
+ if (!get("nifi.c2.agent.identifier", "c2.agent.identifier", agent_id) ||
agent_id.empty()) {
+ std::lock_guard<std::mutex> guard(fallback_identifier_mutex_);
+ return fallback_identifier_;
+ }
+ return agent_id;
+}
+
+void Configure::setFallbackAgentIdentifier(const std::string& id) {
+ std::lock_guard<std::mutex> guard(fallback_identifier_mutex_);
+ fallback_identifier_ = id;
+}
+
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 5a23d55..fc92e36 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -547,8 +547,7 @@ int16_t FlowController::clearConnection(const std::string
&connection) {
std::shared_ptr<state::response::ResponseNode>
FlowController::getAgentManifest() const {
auto agentInfo =
std::make_shared<state::response::AgentInformation>("agentInfo");
- agentInfo->setIdentifier(configuration_->getAgentIdentifier());
- agentInfo->setAgentClass(configuration_->getAgentClass());
+ agentInfo->setAgentIdentificationProvider(configuration_);
agentInfo->includeAgentStatus(false);
return agentInfo;
}
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index c94fa1b..36ff43c 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -555,14 +555,18 @@ void C2Agent::handle_update(const C2ContentResponse
&resp) {
if (resp.name == "configuration") {
handleConfigurationUpdate(resp);
} else if (resp.name == "properties") {
- bool update_occurred = false;
+ state::UpdateState result = state::UpdateState::FULLY_APPLIED;
for (auto entry : resp.operation_arguments) {
- if (update_property(entry.first, entry.second.to_string()))
- update_occurred = true;
- }
- if (update_occurred) {
- // enable updates to persist the configuration.
+ bool persist = (
+ entry.second.getAnnotation("persist")
+ | utils::map(&AnnotatedValue::to_string)
+ | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+ if (!update_property(entry.first, entry.second.to_string(), persist)) {
+ result = state::UpdateState::PARTIALLY_APPLIED;
+ }
}
+ C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
+ enqueue_c2_response(std::move(response));
} else if (resp.name == "c2") {
// prior configuration options were already in place. thus
// we clear the map so that we don't go through replacing
@@ -638,14 +642,14 @@ void C2Agent::handle_update(const C2ContentResponse
&resp) {
* Updates a property
*/
bool C2Agent::update_property(const std::string &property_name, const
std::string &property_value, bool persist) {
- if (update_service_->canUpdate(property_name)) {
- configuration_->set(property_name, property_value);
- if (persist) {
- configuration_->persistProperties();
- return true;
- }
+ if (update_service_ && !update_service_->canUpdate(property_name)) {
+ return false;
+ }
+ configuration_->set(property_name, property_value);
+ if (!persist) {
+ return true;
}
- return false;
+ return configuration_->persistProperties();
}
void C2Agent::restart_agent() {
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index b1d6ea9..70dd84d 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -59,25 +59,15 @@ bool C2Client::isC2Enabled() const {
}
void C2Client::initialize(core::controller::ControllerServiceProvider
*controller, const std::shared_ptr<state::StateMonitor> &update_sink) {
- std::string class_str;
- configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
- configuration_->setAgentClass(class_str);
-
if (!isC2Enabled()) {
return;
}
- if (class_str.empty()) {
- logger_->log_error("Class name must be defined when C2 is enabled");
- throw std::runtime_error("Class name must be defined when C2 is enabled");
+ if (!configuration_->getAgentClass()) {
+ logger_->log_info("Agent class is not predefined");
}
- std::string identifier_str;
- if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier",
identifier_str) || identifier_str.empty()) {
- // set to the flow controller's identifier
- identifier_str = getControllerUUID().to_string();
- }
- configuration_->setAgentIdentifier(identifier_str);
+ configuration_->setFallbackAgentIdentifier(getControllerUUID().to_string());
if (initialized_ && !flow_update_) {
return;
@@ -102,8 +92,7 @@ void
C2Client::initialize(core::controller::ControllerServiceProvider *controlle
}
auto identifier =
std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor);
if (identifier != nullptr) {
- identifier->setIdentifier(identifier_str);
- identifier->setAgentClass(class_str);
+ identifier->setAgentIdentificationProvider(configuration_);
}
auto monitor =
std::dynamic_pointer_cast<state::response::AgentMonitor>(processor);
if (monitor != nullptr) {
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp
b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 2f4fd8c..23a9e8d 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -38,6 +38,25 @@ namespace c2 {
#pragma push_macro("GetObject")
#undef GetObject
#endif
+
+AnnotatedValue parseAnnotatedValue(const rapidjson::Value& jsonValue) {
+ AnnotatedValue result;
+ if (jsonValue.IsObject() && jsonValue.HasMember("value")) {
+ result = jsonValue["value"].GetString();
+ for (const auto& annotation : jsonValue.GetObject()) {
+ if (annotation.name.GetString() == std::string("value")) {
+ continue;
+ }
+ result.annotations[annotation.name.GetString()] =
parseAnnotatedValue(annotation.value);
+ }
+ } else if (jsonValue.IsBool()) {
+ result = jsonValue.GetBool();
+ } else {
+ result = jsonValue.GetString();
+ }
+ return result;
+}
+
const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload,
const std::vector<char> &response) {
rapidjson::Document root;
@@ -47,21 +66,19 @@ const C2Payload RESTProtocol::parseJsonResponse(const
C2Payload &payload, const
std::string requested_operation = getOperation(payload);
std::string identifier;
-
- if (root.HasMember("operationid")) {
- identifier = root["operationid"].GetString();
- } else if (root.HasMember("operationId")) {
- identifier = root["operationId"].GetString();
- } else if (root.HasMember("identifier")) {
- identifier = root["identifier"].GetString();
+ for (auto key : {"operationid", "operationId", "identifier"}) {
+ if (root.HasMember(key)) {
+ identifier = root[key].GetString();
+ break;
+ }
}
int size = 0;
- if (root.HasMember("requested_operations")) {
- size = root["requested_operations"].Size();
- }
- if (root.HasMember("requestedOperations")) {
- size = root["requestedOperations"].Size();
+ for (auto key : {"requested_operations", "requestedOperations"}) {
+ if (root.HasMember(key)) {
+ size = root[key].Size();
+ break;
+ }
}
// neither must be there. We don't want assign array yet and cause an
assertion error
@@ -83,30 +100,18 @@ const C2Payload RESTProtocol::parseJsonResponse(const
C2Payload &payload, const
new_command.ttl = -1;
// set the identifier if one exists
- if (request.HasMember("operationid")) {
- if (request["operationid"].IsNumber())
- new_command.ident =
std::to_string(request["operationid"].GetInt64());
- else if (request["operationid"].IsString())
- new_command.ident = request["operationid"].GetString();
- else
- throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for
operationid"));
- nested_payload.setIdentifier(new_command.ident);
- } else if (request.HasMember("operationId")) {
- if (request["operationId"].IsNumber())
- new_command.ident =
std::to_string(request["operationId"].GetInt64());
- else if (request["operationId"].IsString())
- new_command.ident = request["operationId"].GetString();
- else
- throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for
operationId"));
- nested_payload.setIdentifier(new_command.ident);
- } else if (request.HasMember("identifier")) {
- if (request["identifier"].IsNumber())
- new_command.ident =
std::to_string(request["identifier"].GetInt64());
- else if (request["identifier"].IsString())
- new_command.ident = request["identifier"].GetString();
- else
- throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for
operationid"));
- nested_payload.setIdentifier(new_command.ident);
+ for (auto key : {"operationid", "operationId", "identifier"}) {
+ if (request.HasMember(key)) {
+ if (request[key].IsNumber()) {
+ new_command.ident = std::to_string(request[key].GetInt64());
+ } else if (request[key].IsString()) {
+ new_command.ident = request[key].GetString();
+ } else {
+ throw Exception(SITE2SITE_EXCEPTION, "Invalid type for " +
std::string{key});
+ }
+ nested_payload.setIdentifier(new_command.ident);
+ break;
+ }
}
if (request.HasMember("name")) {
@@ -115,23 +120,15 @@ const C2Payload RESTProtocol::parseJsonResponse(const
C2Payload &payload, const
new_command.name = request["operand"].GetString();
}
- if (request.HasMember("content") && request["content"].MemberCount() >
0) {
- if (request["content"].IsArray()) {
- for (const auto &member : request["content"].GetArray())
- new_command.operation_arguments[member.GetString()] =
member.GetString();
- } else {
- for (const auto &member : request["content"].GetObject())
- new_command.operation_arguments[member.name.GetString()] =
member.value.GetString();
- }
- } else if (request.HasMember("args") && request["args"].MemberCount()
> 0) {
- if (request["args"].IsArray()) {
- for (const auto &member : request["args"].GetArray())
- new_command.operation_arguments[member.GetString()] =
member.GetString();
- } else {
- for (const auto &member : request["args"].GetObject())
- new_command.operation_arguments[member.name.GetString()] =
member.value.GetString();
+ for (auto key : {"content", "args"}) {
+ if (request.HasMember(key) && request[key].IsObject()) {
+ for (const auto &member : request[key].GetObject()) {
+ new_command.operation_arguments[member.name.GetString()] =
parseAnnotatedValue(member.value);
+ }
+ break;
}
}
+
nested_payload.addContent(std::move(new_command));
new_payload.addPayload(std::move(nested_payload));
}
@@ -335,7 +332,7 @@ rapidjson::Value
RESTProtocol::serializeConnectionQueues(const C2Payload &payloa
updatedContent.name = uuid;
adjusted.setLabel(uuid);
adjusted.setIdentifier(uuid);
- state::response::ValueNode nd;
+ c2::AnnotatedValue nd;
// name should be what was previously the TLN ( top level node )
nd = name;
updatedContent.operation_arguments.insert(std::make_pair("name", nd));
diff --git a/libminifi/test/integration/IntegrationBase.h
b/libminifi/test/integration/IntegrationBase.h
index c0306da..5ca46e9 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -39,7 +39,7 @@ class IntegrationBase {
virtual ~IntegrationBase() = default;
- virtual void run(const std::string& test_file_location, const
utils::optional<std::string>& bootstrap_file = {});
+ virtual void run(const utils::optional<std::string>& test_file_location =
{}, const utils::optional<std::string>& bootstrap_file = {});
void setKeyDir(const std::string key_dir) {
this->key_dir = key_dir;
@@ -81,7 +81,7 @@ class IntegrationBase {
std::shared_ptr<minifi::Configure> configuration;
std::shared_ptr<minifi::FlowController> flowController_;
uint64_t wait_time_;
- std::string port, scheme, path;
+ std::string port, scheme;
std::string key_dir;
std::string state_dir;
};
@@ -101,13 +101,15 @@ void IntegrationBase::configureSecurity() {
}
}
-void IntegrationBase::run(const std::string& test_file_location, const
utils::optional<std::string>& home_path) {
+void IntegrationBase::run(const utils::optional<std::string>&
test_file_location, const utils::optional<std::string>& home_path) {
testSetup();
std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
- configuration->set(minifi::Configure::nifi_flow_configuration_file,
test_file_location);
+ if (test_file_location) {
+ configuration->set(minifi::Configure::nifi_flow_configuration_file,
*test_file_location);
+ }
configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name,
"UnorderedMapKeyValueStoreService");
configureC2();