This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2346489  MINIFICPP-1439 - Startup without agent class, handle class 
update
2346489 is described below

commit 2346489446aff2868e2fd39f6a8e3585624f6a3e
Author: Adam Debreceni <[email protected]>
AuthorDate: Fri Jan 8 16:39:58 2021 +0100

    MINIFICPP-1439 - Startup without agent class, handle class update
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #969
---
 extensions/coap/tests/CoapIntegrationBase.h        |   8 +-
 extensions/http-curl/tests/C2RequestClassTest.cpp  | 139 +++++++++++++++++++++
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPIntegrationBase.h   |  38 ++++--
 .../http-curl/tests/VerifyInvokeHTTPTest.cpp       |  23 +++-
 libminifi/include/c2/C2Agent.h                     |   2 +-
 libminifi/include/c2/C2Payload.h                   |  17 ++-
 .../AgentIdentificationProvider.h}                 |  33 +++--
 .../include/core/state/nodes/AgentInformation.h    |  29 +++--
 libminifi/include/properties/Configuration.h       |  26 +---
 libminifi/include/properties/Configure.h           |   9 +-
 libminifi/src/Configure.cpp                        |  22 ++++
 libminifi/src/FlowController.cpp                   |   3 +-
 libminifi/src/c2/C2Agent.cpp                       |  30 +++--
 libminifi/src/c2/C2Client.cpp                      |  19 +--
 libminifi/src/c2/protocols/RESTProtocol.cpp        | 101 ++++++++-------
 libminifi/test/integration/IntegrationBase.h       |  10 +-
 17 files changed, 349 insertions(+), 161 deletions(-)

diff --git a/extensions/coap/tests/CoapIntegrationBase.h 
b/extensions/coap/tests/CoapIntegrationBase.h
index 6dce8d5..6ccee57 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -44,13 +44,15 @@ class CoapIntegrationBase : public IntegrationBase {
     server.reset();
   }
 
-  void run(const std::string& test_file_location, const 
utils::optional<std::string>& bootstrap_file = {}) override {
+  void run(const utils::optional<std::string>& test_file_location = {}, const 
utils::optional<std::string>& bootstrap_file = {}) override {
     testSetup();
 
     std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
     std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
-    configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
+    if (test_file_location) {
+      configuration->set(minifi::Configure::nifi_flow_configuration_file, 
*test_file_location);
+    }
     configuration->set("c2.agent.heartbeat.period", "200");
 
     std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
@@ -87,7 +89,7 @@ class CoapIntegrationBase : public IntegrationBase {
 };
 
 void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
-
+  std::string path;
   parse_http_components(url, port, scheme, path);
   CivetCallbacks callback{};
   if (url.find("localhost") != std::string::npos) {
diff --git a/extensions/http-curl/tests/C2RequestClassTest.cpp 
b/extensions/http-curl/tests/C2RequestClassTest.cpp
new file mode 100644
index 0000000..7521eb0
--- /dev/null
+++ b/extensions/http-curl/tests/C2RequestClassTest.cpp
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <vector>
+#include <string>
+
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
+#include "CivetStream.h"
+#include "StreamPipe.h"
+#include "OptionalUtils.h"
+
+class C2AcknowledgeHandler : public ServerAwareHandler {
+ public:
+  bool handlePost(CivetServer *server, struct mg_connection *conn) override {
+    std::string req = readPayload(conn);
+    rapidjson::Document root;
+    root.Parse(req.data(), req.size());
+    if (root.HasMember("operationId")) {
+      std::lock_guard<std::mutex> guard(mtx_);
+      acknowledged_operations_.insert(root["operationId"].GetString());
+    }
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                    "text/plain\r\nContent-Length: 0\r\nConnection: 
close\r\n\r\n");
+    return true;
+  }
+
+  bool isAcknowledged(const std::string& operation_id) const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return acknowledged_operations_.count(operation_id) > 0;
+  }
+
+ private:
+  mutable std::mutex mtx_;
+  std::set<std::string> acknowledged_operations_;
+};
+
+class C2HeartbeatHandler : public ServerAwareHandler {
+ public:
+  explicit C2HeartbeatHandler(std::string response) : 
response_(std::move(response)) {}
+
+  bool handlePost(CivetServer *server, struct mg_connection *conn) override {
+    std::string req = readPayload(conn);
+    rapidjson::Document root;
+    root.Parse(req.data(), req.size());
+    utils::optional<std::string> agent_class;
+    if (root["agentInfo"].HasMember("agentClass")) {
+      agent_class = root["agentInfo"]["agentClass"].GetString();
+    }
+    {
+      std::lock_guard<std::mutex> lock(mtx_);
+      classes_.push_back(agent_class);
+    }
+
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                    "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
+              response_.length());
+    mg_printf(conn, "%s", response_.c_str());
+    return true;
+  }
+
+  bool gotClassesInOrder(const std::vector<utils::optional<std::string>>& 
class_names) {
+    std::lock_guard<std::mutex> lock(mtx_);
+    auto it = classes_.begin();
+    for (const auto& class_name : class_names) {
+      it = std::find(classes_.begin(), classes_.end(), class_name);
+      if (it == classes_.end()) {
+        return false;
+      }
+      ++it;
+    }
+    return true;
+  }
+
+ private:
+  std::mutex mtx_;
+  std::vector<utils::optional<std::string>> classes_;
+  std::string response_;
+};
+
+class VerifyC2ClassRequest : public VerifyC2Base {
+ public:
+  explicit VerifyC2ClassRequest(std::function<bool()> verify) : 
verify_(std::move(verify)) {}
+
+  void configureC2() override {
+    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+    configuration->set("nifi.c2.enable", "true");
+    configuration->set("nifi.c2.agent.heartbeat.period", "100");
+    configuration->set("nifi.c2.root.classes", 
"DeviceInfoNode,AgentInformation,FlowInformation");
+  }
+
+  void runAssertions() override {
+    assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(3), 
verify_));
+  }
+
+ private:
+  std::function<bool()> verify_;
+};
+
+int main() {
+  const std::string class_update_id = "321";
+  C2HeartbeatHandler heartbeat_handler(R"({
+    "requested_operations": [{
+      "operation": "update",
+      "name": "properties",
+      "operationId": ")" + class_update_id + R"(",
+      "args": {
+        "nifi.c2.agent.class": {"value": "TestClass", "persist": true}
+      }
+    }]})");
+  C2AcknowledgeHandler ack_handler;
+
+  VerifyC2ClassRequest harness([&]() -> bool {
+    return heartbeat_handler.gotClassesInOrder({{}, {"TestClass"}}) &&
+        ack_handler.isAcknowledged(class_update_id);
+  });
+  harness.setUrl("http://localhost:0/api/heartbeat";, &heartbeat_handler);
+  harness.setUrl("http://localhost:0/api/acknowledge";, &ack_handler);
+  harness.setC2Url("/api/heartbeat", "/api/acknowledge");
+
+  harness.run();
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt 
b/extensions/http-curl/tests/CMakeLists.txt
index 2378862..04a9627 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -78,6 +78,7 @@ add_test(NAME C2DescribeCoreComponentStateTest COMMAND 
C2DescribeCoreComponentSt
 add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest 
"${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest 
"${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/" 
"${TEST_RESOURCES}/TestBad.yml")
 add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration 
"${TEST_RESOURCES}/TestNull.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2RequestClassTest COMMAND C2RequestClassTest)
 if(NOT OPENSSL_OFF)
 add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest 
"${TEST_RESOURCES}/TestHTTPGetSecure.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2VerifyHeartbeatAndStopSecure COMMAND C2VerifyHeartbeatAndStop 
"${TEST_RESOURCES}/C2VerifyHeartbeatAndStopSecure.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h 
b/extensions/http-curl/tests/HTTPIntegrationBase.h
index cbdbdd3..48724bb 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -43,7 +43,9 @@ public:
         server(nullptr) {
   }
 
-  void setUrl(const std::string &url, ServerAwareHandler *handler);
+  virtual void setUrl(const std::string &url, ServerAwareHandler *handler);
+
+  void setC2Url(const std::string& heartbeat_path, const std::string& 
acknowledge_path);
 
   void shutdownBeforeFlowController() override {
     server.reset();
@@ -68,21 +70,31 @@ public:
 };
 
 void HTTPIntegrationBase::setUrl(const std::string &url, ServerAwareHandler 
*handler) {
-  parse_http_components(url, port, scheme, path);
-  CivetCallbacks callback{};
+  std::string url_port, url_scheme, url_path;
+  parse_http_components(url, url_port, url_scheme, url_path);
   if (server) {
-    server->addHandler(path, handler);
+    if (url_port != "0" && url_port != port) {
+      throw std::logic_error("Inconsistent port requirements");
+    }
+    if (url_scheme != scheme) {
+      throw std::logic_error("Inconsistent scheme requirements");
+    }
+    server->addHandler(url_path, handler);
     return;
   }
+  // initialize server
+  scheme = url_scheme;
+  port = url_port;
+  CivetCallbacks callback{};
   if (scheme == "https" && !key_dir.empty()) {
     std::string cert = key_dir + "nifi-cert.pem";
     memset(&callback, 0, sizeof(callback));
     callback.init_ssl = ssl_enable;
     port += "s";
     callback.log_message = log_message;
-    server = utils::make_unique<TestServer>(port, path, handler, &callback, 
cert, cert);
+    server = utils::make_unique<TestServer>(port, url_path, handler, 
&callback, cert, cert);
   } else {
-    server = utils::make_unique<TestServer>(port, path, handler);
+    server = utils::make_unique<TestServer>(port, url_path, handler);
   }
   bool secure{false};
   if (port == "0" || port == "0s") {
@@ -92,13 +104,23 @@ void HTTPIntegrationBase::setUrl(const std::string &url, 
ServerAwareHandler *han
       port += "s";
     }
   }
-  std::string c2_url = std::string("http") + (secure ? "s" : "") + 
"://localhost:" + getWebPort() + path;
+  std::string c2_url = std::string("http") + (secure ? "s" : "") + 
"://localhost:" + getWebPort() + url_path;
   configuration->set("nifi.c2.rest.url", c2_url);
   configuration->set("nifi.c2.rest.url.ack", c2_url);
 }
 
+void HTTPIntegrationBase::setC2Url(const std::string &heartbeat_path, const 
std::string &acknowledge_path) {
+  if (port.empty()) {
+    throw std::logic_error("Port is not yet initialized");
+  }
+  bool secure = port.back() == 's';
+  std::string base = std::string("http") + (secure ? "s" : "") + 
"://localhost:" + getWebPort();
+  configuration->set("nifi.c2.rest.url", base + heartbeat_path);
+  configuration->set("nifi.c2.rest.url.ack", base + acknowledge_path);
+}
+
 class VerifyC2Base : public HTTPIntegrationBase {
-public:
+ public:
   void testSetup() override {
     LogTestController::getInstance().setDebug<utils::HTTPClient>();
     LogTestController::getInstance().setDebug<LogTestController>();
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp 
b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 42b1b9c..d6422e3 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -44,18 +44,30 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
   void cleanup() override {
   }
 
+  void setUrl(const std::string &url, ServerAwareHandler *handler) override {
+    if (path_) {
+      throw std::logic_error("Url is already set");
+    }
+    std::string port, scheme, path;
+    parse_http_components(url, port, scheme, path);
+    path_ = path;
+    HTTPIntegrationBase::setUrl(url, handler);
+  }
+
   void setProperties(std::shared_ptr<core::Processor> proc) {
-    std::string url = scheme + "://localhost:" + getWebPort() + path;
+    std::string url = scheme + "://localhost:" + getWebPort() + *path_;
     proc->setProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
   }
 
-  void setupFlow(const std::string& flow_yml_path) {
+  void setupFlow(const utils::optional<std::string>& flow_yml_path) {
     testSetup();
 
     std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
     std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
-    configuration->set(minifi::Configure::nifi_flow_configuration_file, 
flow_yml_path);
+    if (flow_yml_path) {
+      configuration->set(minifi::Configure::nifi_flow_configuration_file, 
*flow_yml_path);
+    }
     configuration->set("c2.agent.heartbeat.period", "200");
     std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(configuration);
@@ -76,7 +88,7 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
     setProperties(processorController->getProcessor());
   }
 
-  void run(const std::string& flow_yml_path, const 
utils::optional<std::string>& bootstrap_file = {}) override {
+  void run(const utils::optional<std::string>& flow_yml_path = {}, const 
utils::optional<std::string>& bootstrap_file = {}) override {
     setupFlow(flow_yml_path);
     startFlowController();
 
@@ -96,6 +108,9 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
 
     cleanup();
   }
+
+ private:
+  utils::optional<std::string> path_;
 };
 
 class VerifyInvokeHTTPOKResponse : public VerifyInvokeHTTP {
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 6333635..4e61300 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -148,7 +148,7 @@ class C2Agent : public state::UpdateController {
   /**
    * Updates a property
    */
-  bool update_property(const std::string &property_name, const std::string 
&property_value,  bool persist = false);
+  bool update_property(const std::string &property_name, const std::string 
&property_value,  bool persist);
 
   /**
    * Creates configuration options C2 payload for response
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index fe81da3..4be8ae6 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -56,6 +56,21 @@ enum Direction {
   RECEIVE
 };
 
+struct AnnotatedValue : state::response::ValueNode {
+  using state::response::ValueNode::ValueNode;
+  using state::response::ValueNode::operator=;
+
+  utils::optional<std::reference_wrapper<const AnnotatedValue>> 
getAnnotation(const std::string& name) const {
+    auto it = annotations.find(name);
+    if (it == annotations.end()) {
+      return {};
+    }
+    return std::cref(it->second);
+  }
+
+  std::map<std::string, AnnotatedValue> annotations;
+};
+
 struct C2ContentResponse {
   explicit C2ContentResponse(Operation op)
       :op{ op }
@@ -85,7 +100,7 @@ struct C2ContentResponse {
   // name applied to commands
   std::string name;
   // commands that correspond with the operation.
-  std::map<std::string, state::response::ValueNode> operation_arguments;
+  std::map<std::string, AnnotatedValue> operation_arguments;
 };
 
 /**
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/core/AgentIdentificationProvider.h
similarity index 58%
copy from libminifi/include/properties/Configure.h
copy to libminifi/include/core/AgentIdentificationProvider.h
index 8b26bef..40f77f4 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/core/AgentIdentificationProvider.h
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,36 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #pragma once
 
 #include <string>
-#include <utility>
-
-#include "properties/Configuration.h"
-#include "properties/Decryptor.h"
 #include "utils/OptionalUtils.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
+namespace core {
 
-class Configure : public Configuration {
+/**
+ * Provides a single source of truth for the agent's class and identifier.
+ */
+class AgentIdentificationProvider {
  public:
-  explicit Configure(utils::optional<Decryptor> decryptor = utils::nullopt)
-      : Configuration{}, decryptor_(std::move(decryptor)) {}
-
-  bool get(const std::string& key, std::string& value) const;
-  bool get(const std::string& key, const std::string& alternate_key, 
std::string& value) const;
-  utils::optional<std::string> get(const std::string& key) const;
+  virtual utils::optional<std::string> getAgentClass() const = 0;
 
- private:
-  bool isEncrypted(const std::string& key) const;
+  virtual std::string getAgentIdentifier() const = 0;
 
-  utils::optional<Decryptor> decryptor_;
+  virtual ~AgentIdentificationProvider() = default;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h 
b/libminifi/include/core/state/nodes/AgentInformation.h
index 14ab127..63b1e36 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -60,6 +60,8 @@
 #include "core/state/nodes/StateMonitor.h"
 #include "io/ClientSocket.h"
 #include "SchedulingNodes.h"
+#include "utils/OptionalUtils.h"
+#include "core/AgentIdentificationProvider.h"
 
 namespace org {
 namespace apache {
@@ -502,12 +504,8 @@ class AgentIdentifier {
      : include_agent_manifest_(true) {
   }
 
-  void setIdentifier(const std::string &identifier) {
-    identifier_ = identifier;
-  }
-
-  void setAgentClass(const std::string &agentClass) {
-    agent_class_ = agentClass;
+  void 
setAgentIdentificationProvider(std::shared_ptr<core::AgentIdentificationProvider>
 provider) {
+    provider_ = std::move(provider);
   }
 
   void includeAgentManifest(bool include) {
@@ -515,8 +513,7 @@ class AgentIdentifier {
   }
 
  protected:
-  std::string identifier_;
-  std::string agent_class_;
+  std::shared_ptr<core::AgentIdentificationProvider> provider_;
   bool include_agent_manifest_;
 };
 
@@ -644,14 +641,16 @@ class AgentNode : public DeviceInformation, public 
AgentMonitor, public AgentIde
     SerializedResponseNode ident;
 
     ident.name = "identifier";
-    ident.value = identifier_;
-
-    SerializedResponseNode agentClass;
-    agentClass.name = "agentClass";
-    agentClass.value = agent_class_;
-
+    ident.value = provider_->getAgentIdentifier();
     serialized.push_back(ident);
-    serialized.push_back(agentClass);
+
+    utils::optional<std::string> agent_class = provider_->getAgentClass();
+    if (agent_class) {
+      SerializedResponseNode agentClass;
+      agentClass.name = "agentClass";
+      agentClass.value = *agent_class;
+      serialized.push_back(agentClass);
+    }
 
     return serialized;
   }
diff --git a/libminifi/include/properties/Configuration.h 
b/libminifi/include/properties/Configuration.h
index a0655ae..39ecaaa 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -20,35 +20,18 @@
 #include <string>
 #include <mutex>
 #include "properties/Properties.h"
+#include "utils/OptionalUtils.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 
+// TODO(adebreceni): eliminate this class in a separate PR
 class Configuration : public Properties {
  public:
   Configuration() : Properties("MiNiFi configuration") {}
 
-  void setAgentIdentifier(const std::string &identifier) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    agent_identifier_ = identifier;
-  }
-  std::string getAgentIdentifier() const {
-    std::lock_guard<std::mutex> lock(mutex_);
-    return agent_identifier_;
-  }
-
-  void setAgentClass(const std::string& agentClass) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    agent_class_ = agentClass;
-  }
-
-  std::string getAgentClass() const {
-    std::lock_guard<std::mutex> lock(mutex_);
-    return agent_class_;
-  }
-
   // nifi.flow.configuration.file
   static constexpr const char *nifi_default_directory = 
"nifi.default.directory";
   static constexpr const char *nifi_flow_configuration_file = 
"nifi.flow.configuration.file";
@@ -115,11 +98,6 @@ class Configuration : public Properties {
   static constexpr const char *minifi_disk_space_watchdog_interval = 
"minifi.disk.space.watchdog.interval";
   static constexpr const char *minifi_disk_space_watchdog_stop_threshold = 
"minifi.disk.space.watchdog.stop.threshold";
   static constexpr const char *minifi_disk_space_watchdog_restart_threshold = 
"minifi.disk.space.watchdog.restart.threshold";
-
- private:
-  std::string agent_identifier_;
-  std::string agent_class_;
-  mutable std::mutex mutex_;
 };
 
 }  // namespace minifi
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index 8b26bef..7073681 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -22,13 +22,14 @@
 #include "properties/Configuration.h"
 #include "properties/Decryptor.h"
 #include "utils/OptionalUtils.h"
+#include "core/AgentIdentificationProvider.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 
-class Configure : public Configuration {
+class Configure : public Configuration, public 
core::AgentIdentificationProvider {
  public:
   explicit Configure(utils::optional<Decryptor> decryptor = utils::nullopt)
       : Configuration{}, decryptor_(std::move(decryptor)) {}
@@ -37,10 +38,16 @@ class Configure : public Configuration {
   bool get(const std::string& key, const std::string& alternate_key, 
std::string& value) const;
   utils::optional<std::string> get(const std::string& key) const;
 
+  utils::optional<std::string> getAgentClass() const override;
+  std::string getAgentIdentifier() const override;
+  void setFallbackAgentIdentifier(const std::string& id);
+
  private:
   bool isEncrypted(const std::string& key) const;
 
   utils::optional<Decryptor> decryptor_;
+  mutable std::mutex fallback_identifier_mutex_;
+  std::string fallback_identifier_;
 };
 
 }  // namespace minifi
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 28de0c8..a815a81 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -62,6 +62,28 @@ bool Configure::isEncrypted(const std::string& key) const {
   return decryptor_->isValidEncryptionMarker(encryption_marker);
 }
 
+utils::optional<std::string> Configure::getAgentClass() const {
+  std::string agent_class;
+  if (get("nifi.c2.agent.class", "c2.agent.class", agent_class) && 
!agent_class.empty()) {
+    return agent_class;
+  }
+  return {};
+}
+
+std::string Configure::getAgentIdentifier() const {
+  std::string agent_id;
+  if (!get("nifi.c2.agent.identifier", "c2.agent.identifier", agent_id) || 
agent_id.empty()) {
+    std::lock_guard<std::mutex> guard(fallback_identifier_mutex_);
+    return fallback_identifier_;
+  }
+  return agent_id;
+}
+
+void Configure::setFallbackAgentIdentifier(const std::string& id) {
+  std::lock_guard<std::mutex> guard(fallback_identifier_mutex_);
+  fallback_identifier_ = id;
+}
+
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 5a23d55..fc92e36 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -547,8 +547,7 @@ int16_t FlowController::clearConnection(const std::string 
&connection) {
 
 std::shared_ptr<state::response::ResponseNode> 
FlowController::getAgentManifest() const {
   auto agentInfo = 
std::make_shared<state::response::AgentInformation>("agentInfo");
-  agentInfo->setIdentifier(configuration_->getAgentIdentifier());
-  agentInfo->setAgentClass(configuration_->getAgentClass());
+  agentInfo->setAgentIdentificationProvider(configuration_);
   agentInfo->includeAgentStatus(false);
   return agentInfo;
 }
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index c94fa1b..36ff43c 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -555,14 +555,18 @@ void C2Agent::handle_update(const C2ContentResponse 
&resp) {
   if (resp.name == "configuration") {
     handleConfigurationUpdate(resp);
   } else if (resp.name == "properties") {
-    bool update_occurred = false;
+    state::UpdateState result = state::UpdateState::FULLY_APPLIED;
     for (auto entry : resp.operation_arguments) {
-      if (update_property(entry.first, entry.second.to_string()))
-        update_occurred = true;
-    }
-    if (update_occurred) {
-      // enable updates to persist the configuration.
+      bool persist = (
+          entry.second.getAnnotation("persist")
+          | utils::map(&AnnotatedValue::to_string)
+          | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+      if (!update_property(entry.first, entry.second.to_string(), persist)) {
+        result = state::UpdateState::PARTIALLY_APPLIED;
+      }
     }
+    C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
+    enqueue_c2_response(std::move(response));
   } else if (resp.name == "c2") {
     // prior configuration options were already in place. thus
     // we clear the map so that we don't go through replacing
@@ -638,14 +642,14 @@ void C2Agent::handle_update(const C2ContentResponse 
&resp) {
  * Updates a property
  */
 bool C2Agent::update_property(const std::string &property_name, const 
std::string &property_value, bool persist) {
-  if (update_service_->canUpdate(property_name)) {
-    configuration_->set(property_name, property_value);
-    if (persist) {
-      configuration_->persistProperties();
-      return true;
-    }
+  if (update_service_ && !update_service_->canUpdate(property_name)) {
+    return false;
+  }
+  configuration_->set(property_name, property_value);
+  if (!persist) {
+    return true;
   }
-  return false;
+  return configuration_->persistProperties();
 }
 
 void C2Agent::restart_agent() {
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index b1d6ea9..70dd84d 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -59,25 +59,15 @@ bool C2Client::isC2Enabled() const {
 }
 
 void C2Client::initialize(core::controller::ControllerServiceProvider 
*controller, const std::shared_ptr<state::StateMonitor> &update_sink) {
-  std::string class_str;
-  configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
-  configuration_->setAgentClass(class_str);
-
   if (!isC2Enabled()) {
     return;
   }
 
-  if (class_str.empty()) {
-    logger_->log_error("Class name must be defined when C2 is enabled");
-    throw std::runtime_error("Class name must be defined when C2 is enabled");
+  if (!configuration_->getAgentClass()) {
+    logger_->log_info("Agent class is not predefined");
   }
 
-  std::string identifier_str;
-  if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", 
identifier_str) || identifier_str.empty()) {
-    // set to the flow controller's identifier
-    identifier_str = getControllerUUID().to_string();
-  }
-  configuration_->setAgentIdentifier(identifier_str);
+  configuration_->setFallbackAgentIdentifier(getControllerUUID().to_string());
 
   if (initialized_ && !flow_update_) {
     return;
@@ -102,8 +92,7 @@ void 
C2Client::initialize(core::controller::ControllerServiceProvider *controlle
       }
       auto identifier = 
std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor);
       if (identifier != nullptr) {
-        identifier->setIdentifier(identifier_str);
-        identifier->setAgentClass(class_str);
+        identifier->setAgentIdentificationProvider(configuration_);
       }
       auto monitor = 
std::dynamic_pointer_cast<state::response::AgentMonitor>(processor);
       if (monitor != nullptr) {
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp 
b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 2f4fd8c..23a9e8d 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -38,6 +38,25 @@ namespace c2 {
 #pragma push_macro("GetObject")
 #undef GetObject
 #endif
+
+AnnotatedValue parseAnnotatedValue(const rapidjson::Value& jsonValue) {
+  AnnotatedValue result;
+  if (jsonValue.IsObject() && jsonValue.HasMember("value")) {
+    result = jsonValue["value"].GetString();
+    for (const auto& annotation : jsonValue.GetObject()) {
+      if (annotation.name.GetString() == std::string("value")) {
+        continue;
+      }
+      result.annotations[annotation.name.GetString()] = 
parseAnnotatedValue(annotation.value);
+    }
+  } else if (jsonValue.IsBool()) {
+    result = jsonValue.GetBool();
+  } else {
+    result = jsonValue.GetString();
+  }
+  return result;
+}
+
 const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, 
const std::vector<char> &response) {
   rapidjson::Document root;
 
@@ -47,21 +66,19 @@ const C2Payload RESTProtocol::parseJsonResponse(const 
C2Payload &payload, const
       std::string requested_operation = getOperation(payload);
 
       std::string identifier;
-
-      if (root.HasMember("operationid")) {
-        identifier = root["operationid"].GetString();
-      } else if (root.HasMember("operationId")) {
-        identifier = root["operationId"].GetString();
-      } else if (root.HasMember("identifier")) {
-        identifier = root["identifier"].GetString();
+      for (auto key : {"operationid", "operationId", "identifier"}) {
+        if (root.HasMember(key)) {
+          identifier = root[key].GetString();
+          break;
+        }
       }
 
       int size = 0;
-      if (root.HasMember("requested_operations")) {
-        size = root["requested_operations"].Size();
-      }
-      if (root.HasMember("requestedOperations")) {
-        size = root["requestedOperations"].Size();
+      for (auto key : {"requested_operations", "requestedOperations"}) {
+        if (root.HasMember(key)) {
+          size = root[key].Size();
+          break;
+        }
       }
 
       // neither must be there. We don't want assign array yet and cause an 
assertion error
@@ -83,30 +100,18 @@ const C2Payload RESTProtocol::parseJsonResponse(const 
C2Payload &payload, const
         new_command.ttl = -1;
 
         // set the identifier if one exists
-        if (request.HasMember("operationid")) {
-          if (request["operationid"].IsNumber())
-            new_command.ident = 
std::to_string(request["operationid"].GetInt64());
-          else if (request["operationid"].IsString())
-            new_command.ident = request["operationid"].GetString();
-          else
-            throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for 
operationid"));
-          nested_payload.setIdentifier(new_command.ident);
-        } else if (request.HasMember("operationId")) {
-          if (request["operationId"].IsNumber())
-            new_command.ident = 
std::to_string(request["operationId"].GetInt64());
-          else if (request["operationId"].IsString())
-            new_command.ident = request["operationId"].GetString();
-          else
-            throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for 
operationId"));
-          nested_payload.setIdentifier(new_command.ident);
-        } else if (request.HasMember("identifier")) {
-          if (request["identifier"].IsNumber())
-            new_command.ident = 
std::to_string(request["identifier"].GetInt64());
-          else if (request["identifier"].IsString())
-            new_command.ident = request["identifier"].GetString();
-          else
-            throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for 
operationid"));
-          nested_payload.setIdentifier(new_command.ident);
+        for (auto key : {"operationid", "operationId", "identifier"}) {
+          if (request.HasMember(key)) {
+            if (request[key].IsNumber()) {
+              new_command.ident = std::to_string(request[key].GetInt64());
+            } else if (request[key].IsString()) {
+              new_command.ident = request[key].GetString();
+            } else {
+              throw Exception(SITE2SITE_EXCEPTION, "Invalid type for " + 
std::string{key});
+            }
+            nested_payload.setIdentifier(new_command.ident);
+            break;
+          }
         }
 
         if (request.HasMember("name")) {
@@ -115,23 +120,15 @@ const C2Payload RESTProtocol::parseJsonResponse(const 
C2Payload &payload, const
           new_command.name = request["operand"].GetString();
         }
 
-        if (request.HasMember("content") && request["content"].MemberCount() > 
0) {
-          if (request["content"].IsArray()) {
-            for (const auto &member : request["content"].GetArray())
-              new_command.operation_arguments[member.GetString()] = 
member.GetString();
-          } else {
-            for (const auto &member : request["content"].GetObject())
-              new_command.operation_arguments[member.name.GetString()] = 
member.value.GetString();
-          }
-        } else if (request.HasMember("args") && request["args"].MemberCount() 
> 0) {
-          if (request["args"].IsArray()) {
-            for (const auto &member : request["args"].GetArray())
-              new_command.operation_arguments[member.GetString()] = 
member.GetString();
-          } else {
-            for (const auto &member : request["args"].GetObject())
-              new_command.operation_arguments[member.name.GetString()] = 
member.value.GetString();
+        for (auto key : {"content", "args"}) {
+          if (request.HasMember(key) && request[key].IsObject()) {
+            for (const auto &member : request[key].GetObject()) {
+              new_command.operation_arguments[member.name.GetString()] = 
parseAnnotatedValue(member.value);
+            }
+            break;
           }
         }
+
         nested_payload.addContent(std::move(new_command));
         new_payload.addPayload(std::move(nested_payload));
       }
@@ -335,7 +332,7 @@ rapidjson::Value 
RESTProtocol::serializeConnectionQueues(const C2Payload &payloa
   updatedContent.name = uuid;
   adjusted.setLabel(uuid);
   adjusted.setIdentifier(uuid);
-  state::response::ValueNode nd;
+  c2::AnnotatedValue nd;
   // name should be what was previously the TLN ( top level node )
   nd = name;
   updatedContent.operation_arguments.insert(std::make_pair("name", nd));
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index c0306da..5ca46e9 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -39,7 +39,7 @@ class IntegrationBase {
 
   virtual ~IntegrationBase() = default;
 
-  virtual void run(const std::string& test_file_location, const 
utils::optional<std::string>& bootstrap_file = {});
+  virtual void run(const utils::optional<std::string>& test_file_location = 
{}, const utils::optional<std::string>& bootstrap_file = {});
 
   void setKeyDir(const std::string key_dir) {
     this->key_dir = key_dir;
@@ -81,7 +81,7 @@ class IntegrationBase {
   std::shared_ptr<minifi::Configure> configuration;
   std::shared_ptr<minifi::FlowController> flowController_;
   uint64_t wait_time_;
-  std::string port, scheme, path;
+  std::string port, scheme;
   std::string key_dir;
   std::string state_dir;
 };
@@ -101,13 +101,15 @@ void IntegrationBase::configureSecurity() {
   }
 }
 
-void IntegrationBase::run(const std::string& test_file_location, const 
utils::optional<std::string>& home_path) {
+void IntegrationBase::run(const utils::optional<std::string>& 
test_file_location, const utils::optional<std::string>& home_path) {
   testSetup();
 
   std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
   std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
-  configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
+  if (test_file_location) {
+    configuration->set(minifi::Configure::nifi_flow_configuration_file, 
*test_file_location);
+  }
   
configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name,
 "UnorderedMapKeyValueStoreService");
 
   configureC2();

Reply via email to