This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e8736bd04414090890f948dc02de406a40b12103
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Mar 29 16:59:20 2022 +0200

    MINIFICPP-1780 Restart agent after C2 property update
    
    Co-authored-by: Marton Szasz <[email protected]>
    Signed-off-by: Ferenc Gerlits <[email protected]>
    This closes #1299
---
 extensions/coap/tests/CoapIntegrationBase.h        |   3 +-
 extensions/http-curl/protocols/RESTSender.cpp      |   2 +-
 extensions/http-curl/tests/C2PauseResumeTest.cpp   |   3 +-
 .../http-curl/tests/C2PropertiesUpdateTests.cpp    |  23 +-
 .../tests/ControllerServiceIntegrationTests.cpp    |  12 +-
 extensions/http-curl/tests/HTTPHandlers.h          |   2 +-
 extensions/http-curl/tests/HTTPIntegrationBase.h   |   4 +
 extensions/systemd/CMakeLists.txt                  |   2 +-
 extensions/systemd/ConsumeJournald.cpp             |   2 +-
 extensions/systemd/ConsumeJournald.h               |   4 +-
 libminifi/include/FlowController.h                 |  21 +-
 libminifi/include/agent/build_description.h        |  36 +-
 libminifi/include/c2/C2Agent.h                     |  10 +-
 libminifi/include/c2/C2Client.h                    |  14 +-
 libminifi/include/c2/C2Payload.h                   |   2 +-
 libminifi/include/core/state/Value.h               | 124 +++---
 .../include/core/state/nodes/AgentInformation.h    | 207 ++++------
 .../include/core/state/nodes/FlowInformation.h     |  12 +-
 libminifi/include/core/state/nodes/MetricsBase.h   |   2 +-
 libminifi/include/properties/Configuration.h       |   1 +
 .../include/utils/FifoExecutor.h                   |  13 +-
 libminifi/include/utils/SmallString.h              |   4 +
 libminifi/include/utils/file/FileUtils.h           |   6 +-
 .../include/utils/meta/type_list.h                 |  36 +-
 libminifi/src/Configuration.cpp                    |   1 +
 libminifi/src/FlowController.cpp                   |  25 +-
 libminifi/src/c2/C2Agent.cpp                       |  48 +--
 libminifi/src/c2/C2Client.cpp                      |  30 +-
 libminifi/src/core/state/Value.cpp                 |  48 ++-
 .../src/utils/FifoExecutor.cpp                     |  15 +-
 libminifi/src/utils/file/FileUtils.cpp             |   7 +
 libminifi/test/aws-tests/FetchS3ObjectTests.cpp    |   8 +-
 libminifi/test/flow-tests/TestControllerWithFlow.h |   3 +-
 libminifi/test/integration/IntegrationBase.h       | 116 ++++--
 .../test/integration/ProvenanceReportingTest.cpp   |  18 +-
 .../test/persistence-tests/PersistenceTests.cpp    |   4 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         |   2 +-
 libminifi/test/unit/ProvenanceTestHelper.h         |   3 +-
 main/AgentDocs.cpp                                 |  40 +-
 main/AgentDocs.h                                   |  19 +-
 main/MiNiFiMain.cpp                                | 449 +++++++++++----------
 nanofi/src/cxx/C2CallbackAgent.cpp                 |  14 +-
 42 files changed, 689 insertions(+), 706 deletions(-)

diff --git a/extensions/coap/tests/CoapIntegrationBase.h 
b/extensions/coap/tests/CoapIntegrationBase.h
index 74f7c344f..520aaaf76 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -74,7 +74,8 @@ class CoapIntegrationBase : public IntegrationBase {
 
     std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
 
-    std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME);
+    std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(), []{});
 
     controller->load();
     controller->start();
diff --git a/extensions/http-curl/protocols/RESTSender.cpp 
b/extensions/http-curl/protocols/RESTSender.cpp
index 64c7408ea..af15b4dbf 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -141,7 +141,7 @@ C2Payload RESTSender::sendPayload(const std::string url, 
const Direction directi
   }
 
   if (payload.getOperation() == Operation::TRANSFER) {
-    file_callback = std::unique_ptr<utils::ByteOutputCallback>(new 
utils::ByteOutputCallback(std::numeric_limits<size_t>::max()));
+    file_callback = 
std::make_unique<utils::ByteOutputCallback>(std::numeric_limits<size_t>::max());
     read.pos = 0;
     read.ptr = file_callback.get();
     client.setReadCallback(&read);
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp 
b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index d4cbbf83c..bfb69fd18 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -131,7 +131,8 @@ int main(int argc, char **argv) {
     test_repo, test_repo, content_repo, stream_factory, configuration, 
args.test_file);
 
   std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(
-      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
content_repo, DEFAULT_ROOT_GROUP_NAME);
+      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
content_repo, DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(), []{});
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, args.test_file);
 
diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp 
b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
index 6d96de6ba..d92497c0b 100644
--- a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
+++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp
@@ -80,7 +80,12 @@ class C2HeartbeatHandler : public ServerAwareHandler {
 
 class VerifyPropertyUpdate : public HTTPIntegrationBase {
  public:
+  VerifyPropertyUpdate() :fn_{[]{}} {}
   explicit VerifyPropertyUpdate(std::function<void()> fn) : fn_(std::move(fn)) 
{}
+  VerifyPropertyUpdate(const VerifyPropertyUpdate&) = delete;
+  VerifyPropertyUpdate(VerifyPropertyUpdate&&) = default;
+  VerifyPropertyUpdate& operator=(const VerifyPropertyUpdate&) = delete;
+  VerifyPropertyUpdate& operator=(VerifyPropertyUpdate&&) = default;
 
   void testSetup() {}
 
@@ -89,6 +94,8 @@ class VerifyPropertyUpdate : public HTTPIntegrationBase {
   }
 
   std::function<void()> fn_;
+
+  [[nodiscard]] int getRestartRequestedCount() const noexcept { return 
restart_requested_count_; }
 };
 
 static const std::string properties_file =
@@ -96,7 +103,7 @@ static const std::string properties_file =
     "nifi.c2.agent.protocol.class=RESTSender\n"
     "nifi.c2.enable=true\n"
     "nifi.c2.agent.class=test\n"
-    "nifi.c2.agent.heartbeat.period=100\n";
+    "nifi.c2.agent.heartbeat.period=500\n";
 
 static const std::string log_properties_file =
     "logger.root=INFO,ostream\n";
@@ -151,10 +158,18 @@ int main() {
     assert(!log_test_controller->contains("DummyClass3::before", 0s));
   }
 
-  VerifyPropertyUpdate harness([&] {
+  // On msvc, the passed lambda can't capture a reference to the object under 
construction, so we need to late-init harness.
+  VerifyPropertyUpdate harness;
+  harness = VerifyPropertyUpdate([&] {
     assert(utils::verifyEventHappenedInPollTime(3s, [&] {return 
ack_handler.isAcknowledged("79");}));
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return 
ack_handler.getApplyCount("FULLY_APPLIED") == 1;}));
-    assert(utils::verifyEventHappenedInPollTime(3s, [&] {return 
ack_handler.getApplyCount("NO_OPERATION") > 0;}));
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+      return ack_handler.getApplyCount("FULLY_APPLIED") == 1
+          && harness.getRestartRequestedCount() == 1;
+    }));
+    assert(utils::verifyEventHappenedInPollTime(3s, [&] {
+      return ack_handler.getApplyCount("NO_OPERATION") > 0
+          && harness.getRestartRequestedCount() == 1;  // only one, i.e. no 
additional restart requests compared to the previous update.
+    }));
     // update operation acknowledged
     {
       // verify final log levels
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp 
b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index 839769788..72669b64b 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -69,13 +69,15 @@ int main(int argc, char **argv) {
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, args.test_file));
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::make_unique<core::YamlConfiguration>(
+      test_repo, test_repo, content_repo, stream_factory, configuration, 
args.test_file);
   std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr),
-                                                                               
                 content_repo,
-                                                                               
                 DEFAULT_ROOT_GROUP_NAME);
+  const auto controller = std::make_shared<minifi::FlowController>(test_repo, 
test_flow_repo, configuration, std::move(yaml_ptr),
+      content_repo,
+      DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(),
+      []{});
 
   disabled = false;
   std::shared_ptr<core::controller::ControllerServiceMap> map = 
std::make_shared<core::controller::ControllerServiceMap>();
diff --git a/extensions/http-curl/tests/HTTPHandlers.h 
b/extensions/http-curl/tests/HTTPHandlers.h
index 92478a00a..7bae42aed 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -476,7 +476,7 @@ class HeartbeatHandler : public ServerAwareHandler {
           classes.push_back(proc["type"].GetString());
         }
 
-        auto group = minifi::BuildDescription::getClassDescriptions(str);
+        auto group = minifi::BuildDescription{}.getClassDescriptions(str);
         for (const auto& proc : group.processors_) {
           assert(std::find(classes.begin(), classes.end(), proc.class_name_) 
!= std::end(classes));
           (void)proc;
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h 
b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 1136f217b..6870a14fb 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -47,6 +47,10 @@ class HTTPIntegrationBase : public IntegrationBase {
       : IntegrationBase(waitTime),
         server(nullptr) {
   }
+  HTTPIntegrationBase(const HTTPIntegrationBase&) = delete;
+  HTTPIntegrationBase(HTTPIntegrationBase&&) = default;
+  HTTPIntegrationBase& operator=(const HTTPIntegrationBase&) = delete;
+  HTTPIntegrationBase& operator=(HTTPIntegrationBase&&) = default;
 
   virtual void setUrl(const std::string &url, ServerAwareHandler *handler);
 
diff --git a/extensions/systemd/CMakeLists.txt 
b/extensions/systemd/CMakeLists.txt
index 1cb668ba5..41bd52bf1 100644
--- a/extensions/systemd/CMakeLists.txt
+++ b/extensions/systemd/CMakeLists.txt
@@ -19,7 +19,7 @@
 
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
-add_library(minifi-systemd SHARED ConsumeJournald.cpp WorkerThread.cpp 
libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp)
+add_library(minifi-systemd SHARED ConsumeJournald.cpp 
libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp)
 
 target_link_libraries(minifi-systemd ${LIBMINIFI} Threads::Threads date::date)
 
diff --git a/extensions/systemd/ConsumeJournald.cpp 
b/extensions/systemd/ConsumeJournald.cpp
index 94f3ca8b1..98e800ae2 100644
--- a/extensions/systemd/ConsumeJournald.cpp
+++ b/extensions/systemd/ConsumeJournald.cpp
@@ -81,7 +81,7 @@ void ConsumeJournald::initialize() {
   setSupportedProperties({BatchSize, PayloadFormat, IncludeTimestamp, 
JournalType, ProcessOldMessages, TimestampFormat});
   setSupportedRelationships({Success});
 
-  worker_ = std::make_unique<Worker>();
+  worker_ = std::make_unique<utils::FifoExecutor>();
 }
 
 void ConsumeJournald::notifyStop() {
diff --git a/extensions/systemd/ConsumeJournald.h 
b/extensions/systemd/ConsumeJournald.h
index a96c07e03..830e657e2 100644
--- a/extensions/systemd/ConsumeJournald.h
+++ b/extensions/systemd/ConsumeJournald.h
@@ -35,7 +35,7 @@
 #include "libwrapper/LibWrapper.h"
 #include "utils/Deleters.h"
 #include "utils/gsl.h"
-#include "WorkerThread.h"
+#include "utils/FifoExecutor.h"
 
 namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
 
@@ -97,7 +97,7 @@ class ConsumeJournald final : public core::Processor {
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ConsumeJournald>::getLogger();
   core::CoreComponentStateManager* state_manager_;
   std::unique_ptr<libwrapper::LibWrapper> libwrapper_;
-  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<utils::FifoExecutor> worker_;
   std::unique_ptr<libwrapper::Journal> journal_;
 
   std::size_t batch_size_ = 1000;
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 0d5f6d7cd..92fa9aeb6 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -22,6 +22,7 @@
 
 #include <algorithm>
 #include <atomic>
+#include <functional>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -57,10 +58,7 @@
 #include "utils/Id.h"
 #include "utils/file/FileSystem.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 namespace state {
 class ProcessorController;
@@ -77,12 +75,14 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
  public:
   FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, 
std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                 std::shared_ptr<core::ContentRepository> content_repo, 
std::string name = DEFAULT_ROOT_GROUP_NAME,
-                 std::shared_ptr<utils::file::FileSystem> filesystem = 
std::make_shared<utils::file::FileSystem>());
+                 std::shared_ptr<core::ContentRepository> content_repo, const 
std::string& name = DEFAULT_ROOT_GROUP_NAME,
+                 std::shared_ptr<utils::file::FileSystem> filesystem = 
std::make_shared<utils::file::FileSystem>(),
+                 std::function<void()> request_restart = []{});
 
   FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, 
std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                 std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<utils::file::FileSystem> filesystem);
+                 std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<utils::file::FileSystem> filesystem,
+                 std::function<void()> request_restart = []{});
 
   ~FlowController() override;
 
@@ -181,7 +181,7 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
    * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE 
manifest
    * @return the agent manifest response node
    */
-  std::shared_ptr<state::response::ResponseNode> getAgentManifest() const 
override;
+  std::shared_ptr<state::response::ResponseNode> getAgentManifest() override;
 
   uint64_t getUptime() override;
 
@@ -254,9 +254,6 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
   std::map<utils::Identifier, std::unique_ptr<state::ProcessorController>> 
processor_to_controller_;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
 
 #endif  // LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_
diff --git a/libminifi/include/agent/build_description.h 
b/libminifi/include/agent/build_description.h
index 7eec05062..6017fd71f 100644
--- a/libminifi/include/agent/build_description.h
+++ b/libminifi/include/agent/build_description.h
@@ -32,10 +32,7 @@
 #include "core/Annotation.h"
 #include "io/validation.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 class ClassDescription {
  public:
@@ -71,6 +68,10 @@ struct Components {
   std::vector<ClassDescription> processors_;
   std::vector<ClassDescription> controller_services_;
   std::vector<ClassDescription> other_components_;
+
+  [[nodiscard]] bool empty() const noexcept {
+    return processors_.empty() && controller_services_.empty() && 
other_components_.empty();
+  }
 };
 
 struct BundleDetails {
@@ -121,13 +122,8 @@ class ExternalBuildDescription {
 
 class BuildDescription {
  public:
-  static struct Components getClassDescriptions(const std::string& group = 
"minifi-system") {
-    static std::map<std::string, struct Components> class_mappings;
-#ifndef WIN32
-    if (UNLIKELY(IsNullOrEmpty(class_mappings[group].processors_) && 
IsNullOrEmpty(class_mappings[group].processors_))) {
-#else
-      if (class_mappings[group].processors_.empty()) {
-#endif
+  struct Components getClassDescriptions(const std::string& group = 
"minifi-system") {
+    if (class_mappings_[group].empty()) {
       for (const auto& clazz : 
core::ClassLoader::getDefaultClassLoader().getClasses(group)) {
         std::string class_name = clazz;
         auto lastOfIdx = clazz.find_last_of("::");
@@ -158,22 +154,22 @@ class BuildDescription {
             description.inputRequirement_ = 
processor->getInputRequirementAsString();
             description.isSingleThreaded_ = processor->isSingleThreaded();
             description.class_relationships_ = 
processor->getSupportedRelationships();
-            class_mappings[group].processors_.emplace_back(description);
+            class_mappings_[group].processors_.emplace_back(description);
           } else if (is_controller_service) {
-            
class_mappings[group].controller_services_.emplace_back(description);
+            
class_mappings_[group].controller_services_.emplace_back(description);
           } else {
-            class_mappings[group].other_components_.emplace_back(description);
+            class_mappings_[group].other_components_.emplace_back(description);
           }
         }
       }
     }
-    return class_mappings[group];
+    return class_mappings_[group];
   }
-}; // NOLINT
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+ private:
+  std::map<std::string, struct Components> class_mappings_;
+};
+
+}  // namespace org::apache::nifi::minifi
 
 #endif  // LIBMINIFI_INCLUDE_AGENT_BUILD_DESCRIPTION_H_
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 38b33c03a..b32b029f6 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -68,8 +68,9 @@ class C2Agent : public state::UpdateController {
   C2Agent(core::controller::ControllerServiceProvider *controller,
           state::Pausable *pause_handler,
           state::StateMonitor* updateSink,
-          const std::shared_ptr<Configure> &configure,
-          const std::shared_ptr<utils::file::FileSystem> &filesystem = 
std::make_shared<utils::file::FileSystem>());
+          std::shared_ptr<Configure> configure,
+          std::shared_ptr<utils::file::FileSystem> filesystem,
+          std::function<void()> request_restart);
 
   ~C2Agent() noexcept override {
     delete protocol_.load();
@@ -93,8 +94,6 @@ class C2Agent : public state::UpdateController {
   std::optional<std::string> fetchFlow(const std::string& uri) const;
 
  protected:
-  void restart_agent();
-
   /**
    * Check the collection of triggers for any updates that need to be handled.
    * This is an optional step
@@ -246,6 +245,9 @@ class C2Agent : public state::UpdateController {
   bool manifest_sent_;
 
   const uint64_t C2RESPONSE_POLL_MS = 100;
+
+  std::atomic<bool> restart_needed_ = false;
+  std::function<void()> request_restart_;
 };
 
 }  // namespace c2
diff --git a/libminifi/include/c2/C2Client.h b/libminifi/include/c2/C2Client.h
index 14d4dcdd1..7180b281e 100644
--- a/libminifi/include/c2/C2Client.h
+++ b/libminifi/include/c2/C2Client.h
@@ -36,11 +36,7 @@
 #include "core/Flow.h"
 #include "utils/file/FileSystem.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 class C2Client : public core::Flow, public state::response::NodeReporter {
  public:
@@ -48,6 +44,7 @@ class C2Client : public core::Flow, public 
state::response::NodeReporter {
       std::shared_ptr<Configure> configuration, 
std::shared_ptr<core::Repository> provenance_repo,
       std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::ContentRepository> content_repo,
       std::unique_ptr<core::FlowConfiguration> flow_configuration, 
std::shared_ptr<utils::file::FileSystem> filesystem,
+      std::function<void()> request_restart,
       std::shared_ptr<core::logging::Logger> logger = 
core::logging::LoggerFactory<C2Client>::getLogger());
 
   void initialize(core::controller::ControllerServiceProvider *controller, 
state::Pausable *pause_handler, state::StateMonitor* update_sink);
@@ -84,10 +81,7 @@ class C2Client : public core::Flow, public 
state::response::NodeReporter {
 
  protected:
   std::atomic<bool> flow_update_{false};
+  std::function<void()> request_restart_;
 };
 
-}  // namespace c2
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index dfb21d457..badc5e0eb 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -230,7 +230,7 @@ class C2Payload : public state::Update {
 
   friend std::ostream& operator<<(std::ostream& out, const C2Payload& payload);
 
-  std::string str() const {
+  [[nodiscard]] std::string str() const {
     std::stringstream ss;
     ss << *this;
     return std::move(ss).str();
diff --git a/libminifi/include/core/state/Value.h 
b/libminifi/include/core/state/Value.h
index 230eb6f65..6d8b77219 100644
--- a/libminifi/include/core/state/Value.h
+++ b/libminifi/include/core/state/Value.h
@@ -24,18 +24,15 @@
 #include <iostream>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 #include <typeinfo>
 #include "utils/ValueParser.h"
 #include "utils/ValueCaster.h"
 #include "utils/Export.h"
+#include "utils/meta/type_list.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 /**
  * Purpose: Represents an AST value
@@ -48,17 +45,18 @@ class Value {
   using ParseException = utils::internal::ParseException;
 
  public:
-  explicit Value(const std::string &value)
-      : string_value(value),
+  explicit Value(std::string value)
+      : string_value(std::move(value)),
         type_id(std::type_index(typeid(std::string))) {
   }
 
   virtual ~Value() = default;
-  std::string getStringValue() const {
+
+  [[nodiscard]] std::string getStringValue() const {
     return string_value;
   }
 
-  const char* c_str() const {
+  [[nodiscard]] const char* c_str() const {
     return string_value.c_str();
   }
 
@@ -67,7 +65,7 @@ class Value {
     return convertValueImpl<typename std::common_type<T>::type>(ref);
   }
 
-  bool empty() {
+  [[nodiscard]] bool empty() const noexcept {
     return string_value.empty();
   }
 
@@ -178,7 +176,7 @@ class UInt32Value : public Value {
     setTypeId<uint32_t>();
   }
 
-  uint32_t getValue() const {
+  [[nodiscard]] uint32_t getValue() const {
     return value;
   }
 
@@ -210,7 +208,7 @@ class UInt32Value : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  uint32_t value;
+  uint32_t value{};
 };
 
 class IntValue : public Value {
@@ -225,7 +223,7 @@ class IntValue : public Value {
       : Value(strvalue) {
     utils::internal::ValueParser(strvalue).parse(value).parseEnd();
   }
-  int getValue() const {
+  [[nodiscard]] int getValue() const {
     return value;
   }
 
@@ -256,7 +254,7 @@ class IntValue : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  int value;
+  int value{};
 };
 
 class BoolValue : public Value {
@@ -272,7 +270,7 @@ class BoolValue : public Value {
     utils::internal::ValueParser(strvalue).parse(value).parseEnd();
   }
 
-  bool getValue() const {
+  [[nodiscard]] bool getValue() const {
     return value;
   }
 
@@ -302,7 +300,7 @@ class BoolValue : public Value {
     return true;
   }
 
-  bool value;
+  bool value{};
 
  private:
   template<typename T>
@@ -329,7 +327,7 @@ class UInt64Value : public Value {
     setTypeId<uint64_t>();
   }
 
-  uint64_t getValue() const {
+  [[nodiscard]] uint64_t getValue() const {
     return value;
   }
 
@@ -358,7 +356,7 @@ class UInt64Value : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  uint64_t value;
+  uint64_t value{};
 };
 
 class Int64Value : public Value {
@@ -374,7 +372,7 @@ class Int64Value : public Value {
     setTypeId<int64_t>();
   }
 
-  int64_t getValue() {
+  [[nodiscard]] int64_t getValue() const {
     return value;
   }
 
@@ -404,7 +402,7 @@ class Int64Value : public Value {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  int64_t value;
+  int64_t value{};
 };
 
 class DoubleValue : public Value {
@@ -420,37 +418,37 @@ class DoubleValue : public Value {
     setTypeId<double>();
   }
 
-  double getValue() {
+  [[nodiscard]] double getValue() const {
     return value;
   }
 
  protected:
-  virtual bool getValue(int& ref) {
+  bool getValue(int& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(uint32_t& ref) {
+  bool getValue(uint32_t& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(int64_t& ref ) {
+  bool getValue(int64_t& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(uint64_t& ref) {
+  bool getValue(uint64_t& ref) override {
     return utils::internal::cast_if_in_range(value, ref);
   }
 
-  virtual bool getValue(bool&) {
+  bool getValue(bool&) override {
     return false;
   }
 
-  virtual bool getValue(double& ref) {
+  bool getValue(double& ref) override {
     ref = value;
     return true;
   }
 
-  double value;
+  double value{};
 };
 
 static inline std::shared_ptr<Value> createValue(const bool &object) {
@@ -497,56 +495,50 @@ static inline std::shared_ptr<Value> createValue(const 
double &object) {
  * Purpose: ValueNode is the AST container for a value
  */
 class ValueNode {
+  using supported_types = utils::meta::type_list<int, uint32_t, size_t, 
int64_t, uint64_t, bool, char*, const char*, double, std::string>;
+
  public:
-  ValueNode()
-      : value_(nullptr) {
-  }
+  ValueNode() = default;
 
-  ValueNode(ValueNode &&vn) = default;
-  ValueNode(const ValueNode &vn) = default;
+  template<typename T>
+  requires (supported_types::contains<T>())  // NOLINT
+  /* implicit, because it doesn't change the meaning, and it simplifies 
construction of maps */
+  ValueNode(const T value)  // NOLINT
+      :value_{createValue(value)}
+  {}
 
   /**
    * Define the representations and eventual storage relationships through
    * createValue
    */
   template<typename T>
-  auto operator=(const T ref) -> typename std::enable_if<std::is_same<T, int 
>::value ||
-  std::is_same<T, uint32_t >::value ||
-  std::is_same<T, size_t >::value ||
-  std::is_same<T, int64_t>::value ||
-  std::is_same<T, uint64_t >::value ||
-  std::is_same<T, bool >::value ||
-  std::is_same<T, char* >::value ||
-  std::is_same<T, const char* >::value ||
-  std::is_same<T, double>::value ||
-  std::is_same<T, std::string>::value, ValueNode&>::type {
+  requires (supported_types::contains<T>())  // NOLINT
+  ValueNode& operator=(const T ref) {
     value_ = createValue(ref);
     return *this;
   }
 
-  ValueNode &operator=(const ValueNode &ref) = default;
-
   inline bool operator==(const ValueNode &rhs) const {
     return to_string() == rhs.to_string();
   }
 
-  inline bool operator==(const char*rhs) const {
+  inline bool operator==(const char* rhs) const {
     return to_string() == rhs;
   }
 
-  friend bool operator==(const char *lhs, const ValueNode& rhs) {
+  friend bool operator==(const char* lhs, const ValueNode& rhs) {
     return lhs == rhs.to_string();
   }
 
-  std::string to_string() const {
+  [[nodiscard]] std::string to_string() const {
     return value_ ? value_->getStringValue() : "";
   }
 
-  std::shared_ptr<Value> getValue() const {
+  [[nodiscard]] std::shared_ptr<Value> getValue() const {
     return value_;
   }
 
-  bool empty() const {
+  [[nodiscard]] bool empty() const noexcept {
     return value_ == nullptr || value_->empty();
   }
 
@@ -556,33 +548,23 @@ class ValueNode {
 
 struct SerializedResponseNode {
   std::string name;
-  ValueNode value;
-  bool array;
-  bool collapsible;
+  ValueNode value{};
+  bool array = false;
+  bool collapsible = true;
   bool keep_empty = false;
-  std::vector<SerializedResponseNode> children;
-
-  SerializedResponseNode(bool collapsible = true) // NOLINT
-      : array(false),
-        collapsible(collapsible) {
-  }
+  std::vector<SerializedResponseNode> children{};
 
-  SerializedResponseNode(const SerializedResponseNode &other) = default;
-
-  SerializedResponseNode &operator=(const SerializedResponseNode &other) = 
default;
-
-  bool empty() const {
+  [[nodiscard]] bool empty() const noexcept {
     return value.empty() && children.empty();
   }
+
+  [[nodiscard]] std::string to_string() const;
 };
 
+inline std::string to_string(const SerializedResponseNode& node) { return 
node.to_string(); }
+
 std::string hashResponseNodes(const std::vector<SerializedResponseNode>& 
nodes);
 
-}  // namespace response
-}  // namespace state
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::state::response
 
 #endif  // LIBMINIFI_INCLUDE_CORE_STATE_VALUE_H_
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h 
b/libminifi/include/core/state/nodes/AgentInformation.h
index 904ada129..8520c1e40 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -64,12 +64,7 @@
 #include "utils/Export.h"
 #include "SupportedOperations.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 #define GROUP_STR "org.apache.nifi.minifi"
 
@@ -91,7 +86,7 @@ class ComponentManifest : public DeviceInformation {
     std::vector<SerializedResponseNode> serialized;
     SerializedResponseNode resp;
     resp.name = "componentManifest";
-    struct Components group = 
BuildDescription::getClassDescriptions(getName());
+    struct Components group = 
build_description_.getClassDescriptions(getName());
     serializeClassDescription(group.processors_, "processors", resp);
     serializeClassDescription(group.controller_services_, 
"controllerServices", resp);
     serialized.push_back(resp);
@@ -316,6 +311,9 @@ class ComponentManifest : public DeviceInformation {
       response.children.push_back(type);
     }
   }
+
+ private:
+  BuildDescription build_description_;
 };
 
 class ExternalManifest : public ComponentManifest {
@@ -499,11 +497,13 @@ class AgentStatus : public StateMonitorNode {
   }
 
   SerializedResponseNode serializeComponents() const {
-    SerializedResponseNode components_node(false);
+    SerializedResponseNode components_node;
+    components_node.collapsible = false;
     components_node.name = "components";
     if (monitor_ != nullptr) {
       monitor_->executeOnAllComponents([&components_node](StateController& 
component){
-        SerializedResponseNode component_node(false);
+        SerializedResponseNode component_node;
+        component_node.collapsible = false;
         component_node.name = component.getComponentName();
 
         SerializedResponseNode uuid_node;
@@ -585,7 +585,7 @@ class AgentMonitor {
     }
   }
 
-  void setStateMonitor(state::StateMonitor* &monitor) {
+  void setStateMonitor(state::StateMonitor* monitor) {
     monitor_ = monitor;
   }
 
@@ -599,15 +599,15 @@ class AgentMonitor {
  */
 class AgentManifest : public DeviceInformation {
  public:
-  AgentManifest(std::string name, const utils::Identifier& uuid)
-    : DeviceInformation(std::move(name), uuid) {
+  AgentManifest(const std::string& name, const utils::Identifier& uuid)
+    : DeviceInformation(name, uuid) {
   }
 
-  explicit AgentManifest(std::string name)
-    : DeviceInformation(std::move(name)) {
+  explicit AgentManifest(const std::string& name)
+    : DeviceInformation(name) {
   }
 
-  std::string getName() const {
+  std::string getName() const override {
     return "agentManifest";
   }
 
@@ -620,80 +620,38 @@ class AgentManifest : public DeviceInformation {
   }
 
   void setConfigurationReader(std::function<std::optional<std::string>(const 
std::string&)> configuration_reader) {
-    configuration_reader_ = configuration_reader;
-  }
-
-  std::vector<SerializedResponseNode> serialize() {
-    static std::vector<SerializedResponseNode> serialized;
-    if (serialized.empty()) {
-      SerializedResponseNode ident;
-
-      ident.name = "identifier";
-      ident.value = AgentBuild::BUILD_IDENTIFIER;
-
-      SerializedResponseNode type;
-
-      type.name = "agentType";
-      type.value = "cpp";
-
-      SerializedResponseNode version;
-
-      version.name = "version";
-      version.value = AgentBuild::VERSION;
-
-      SerializedResponseNode buildInfo;
-      buildInfo.name = "buildInfo";
-
-      SerializedResponseNode build_version;
-      build_version.name = "version";
-      build_version.value = AgentBuild::VERSION;
-
-      SerializedResponseNode build_rev;
-      build_rev.name = "revision";
-      build_rev.value = AgentBuild::BUILD_REV;
-
-      SerializedResponseNode build_date;
-      build_date.name = "timestamp";
-      build_date.value = (uint64_t) std::stoull(AgentBuild::BUILD_DATE);
-
-      SerializedResponseNode compiler_command;
-      compiler_command.name = "compiler";
-      compiler_command.value = AgentBuild::COMPILER;
-
-      SerializedResponseNode compiler_flags;
-      compiler_flags.name = "flags";
-      compiler_flags.value = AgentBuild::COMPILER_FLAGS;
-
-      buildInfo.children.push_back(compiler_flags);
-      buildInfo.children.push_back(compiler_command);
-
-      buildInfo.children.push_back(build_version);
-      buildInfo.children.push_back(build_rev);
-      buildInfo.children.push_back(build_date);
-
-      Bundles bundles("bundles");
-
-      serialized.push_back(ident);
-      serialized.push_back(type);
-      serialized.push_back(buildInfo);
-      // serialize the bundle information.
-      for (auto bundle : bundles.serialize()) {
-        serialized.push_back(bundle);
-      }
-
-      SchedulingDefaults defaults("schedulingDefaults");
-
-      for (auto defaultNode : defaults.serialize()) {
-        serialized.push_back(defaultNode);
-      }
-
-      SupportedOperations supported_operations("supportedOperations");
-      supported_operations.setStateMonitor(monitor_);
-      
supported_operations.setUpdatePolicyController(update_policy_controller_);
-      supported_operations.setConfigurationReader(configuration_reader_);
-      for (const auto& operation : supported_operations.serialize()) {
-        serialized.push_back(operation);
-      }
+    configuration_reader_ = std::move(configuration_reader);
+  }
+
+  std::vector<SerializedResponseNode> serialize() override {
+    std::vector<SerializedResponseNode> serialized = {
+        {.name = "identifier", .value = AgentBuild::BUILD_IDENTIFIER},
+        {.name = "agentType", .value = "cpp"},
+        {.name = "buildInfo", .children = {
+            {.name = "flags", .value = AgentBuild::COMPILER_FLAGS},
+            {.name = "compiler", .value = AgentBuild::COMPILER},
+            {.name = "version", .value = AgentBuild::VERSION},
+            {.name = "revision", .value = AgentBuild::BUILD_REV},
+            {.name = "timestamp", .value = 
static_cast<uint64_t>(std::stoull(AgentBuild::BUILD_DATE))}
+        }}
+    };
+    {
+      auto bundles = Bundles{"bundles"}.serialize();
+      std::move(std::begin(bundles), std::end(bundles), 
std::back_inserter(serialized));
+    }
+    {
+      auto schedulingDefaults = 
SchedulingDefaults{"schedulingDefaults"}.serialize();
+      std::move(std::begin(schedulingDefaults), std::end(schedulingDefaults), 
std::back_inserter(serialized));
+    }
+    {
+      auto supportedOperations = [this]() {
+        SupportedOperations supported_operations("supportedOperations");
+        supported_operations.setStateMonitor(monitor_);
+        
supported_operations.setUpdatePolicyController(update_policy_controller_);
+        supported_operations.setConfigurationReader(configuration_reader_);
+        return supported_operations.serialize();
+      }();
+      std::move(std::begin(supportedOperations), 
std::end(supportedOperations), std::back_inserter(serialized));
     }
     return serialized;
   }
@@ -721,52 +679,42 @@ class AgentNode : public DeviceInformation, public 
AgentMonitor, public AgentIde
   }
 
   void setConfigurationReader(std::function<std::optional<std::string>(const 
std::string&)> configuration_reader) {
-    configuration_reader_ = configuration_reader;
+    configuration_reader_ = std::move(configuration_reader);
   }
 
  protected:
-  std::vector<SerializedResponseNode> serialize() {
-    std::vector<SerializedResponseNode> serialized;
-
-    SerializedResponseNode ident;
-
-    ident.name = "identifier";
-    ident.value = provider_->getAgentIdentifier();
-    serialized.push_back(ident);
+  std::vector<SerializedResponseNode> serialize() override {
+    std::vector<SerializedResponseNode> serialized = {
+        {.name = "identifier", .value = provider_->getAgentIdentifier()},
+    };
 
     const auto agent_class = provider_->getAgentClass();
     if (agent_class) {
-      SerializedResponseNode agentClass;
-      agentClass.name = "agentClass";
-      agentClass.value = *agent_class;
-      serialized.push_back(agentClass);
+      serialized.push_back({.name = "agentClass", .value = *agent_class});
     }
 
-    SerializedResponseNode agentManifestHash;
-    agentManifestHash.name = "agentManifestHash";
-    agentManifestHash.value = getAgentManifestHash();
-    serialized.push_back(agentManifestHash);
-
+    serialized.push_back({.name = "agentManifestHash", .value = 
getAgentManifestHash()});
     return serialized;
   }
 
   std::vector<SerializedResponseNode> getAgentManifest() const {
-    SerializedResponseNode agentManifest;
-    agentManifest.name = "agentManifest";
-    AgentManifest manifest{"manifest"};
-    manifest.setStateMonitor(monitor_);
-    manifest.setUpdatePolicyController(update_policy_controller_);
-    manifest.setConfigurationReader(configuration_reader_);
-    agentManifest.children = manifest.serialize();
-    return std::vector<SerializedResponseNode>{ agentManifest };
-  }
-
-  std::string getAgentManifestHash() {
-    if (!agentManifestHash_.has_value()) {
-      agentManifestHash_ = hashResponseNodes(getAgentManifest());
+    if (agent_manifest_cache_) { return std::vector{*agent_manifest_cache_}; }
+    agent_manifest_cache_ = {.name = "agentManifest", .children = [this] {
+      AgentManifest manifest{"manifest"};
+      manifest.setStateMonitor(monitor_);
+      manifest.setUpdatePolicyController(update_policy_controller_);
+      manifest.setConfigurationReader(configuration_reader_);
+      return manifest.serialize();
+    }()};
+    agent_manifest_hash_cache_.clear();
+    return std::vector{ *agent_manifest_cache_ };
+  }
+
+  std::string getAgentManifestHash() const {
+    if (agent_manifest_hash_cache_.empty()) {
+      agent_manifest_hash_cache_ = hashResponseNodes(getAgentManifest());
     }
-
-    return *agentManifestHash_;
+    return agent_manifest_hash_cache_;
   }
 
   std::vector<SerializedResponseNode> getAgentStatus() const {
@@ -787,9 +735,11 @@ class AgentNode : public DeviceInformation, public 
AgentMonitor, public AgentIde
   }
 
  private:
-  std::optional<std::string> agentManifestHash_;
+  mutable std::optional<SerializedResponseNode> agent_manifest_cache_;
+  mutable std::string agent_manifest_hash_cache_;
   controllers::UpdatePolicyControllerService* update_policy_controller_ = 
nullptr;
   std::function<std::optional<std::string>(const std::string&)> 
configuration_reader_;
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<AgentNode>::getLogger();
 };
 
 /**
@@ -811,7 +761,7 @@ class AgentInformation : public AgentNode {
     setArray(false);
   }
 
-  std::string getName() const {
+  std::string getName() const override {
     return "agentInfo";
   }
 
@@ -819,7 +769,7 @@ class AgentInformation : public AgentNode {
     include_agent_status_ = include;
   }
 
-  std::vector<SerializedResponseNode> serialize() {
+  std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
     if (include_agent_manifest_) {
       auto manifest = getAgentManifest();
@@ -837,11 +787,6 @@ class AgentInformation : public AgentNode {
   bool include_agent_status_;
 };
 
-}  // namespace response
-}  // namespace state
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::state::response
 
 #endif  // LIBMINIFI_INCLUDE_CORE_STATE_NODES_AGENTINFORMATION_H_
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h 
b/libminifi/include/core/state/nodes/FlowInformation.h
index b10e504fb..aae2e355c 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -194,11 +194,13 @@ class FlowInformation : public FlowMonitor {
     serialized.push_back(uri);
 
     if (!connections_.empty()) {
-      SerializedResponseNode queues(false);
+      SerializedResponseNode queues;
+      queues.collapsible = false;
       queues.name = "queues";
 
       for (auto &queue : connections_) {
-        SerializedResponseNode repoNode(false);
+        SerializedResponseNode repoNode;
+        repoNode.collapsible = false;
         repoNode.name = queue.second->getName();
 
         SerializedResponseNode queueUUIDNode;
@@ -233,11 +235,13 @@ class FlowInformation : public FlowMonitor {
     }
 
     if (nullptr != monitor_) {
-      SerializedResponseNode componentsNode(false);
+      SerializedResponseNode componentsNode;
+      componentsNode.collapsible = false;
       componentsNode.name = "components";
 
       monitor_->executeOnAllComponents([&componentsNode](StateController& 
component){
-        SerializedResponseNode componentNode(false);
+        SerializedResponseNode componentNode;
+        componentNode.collapsible = false;
         componentNode.name = component.getComponentName();
 
         SerializedResponseNode uuidNode;
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h 
b/libminifi/include/core/state/nodes/MetricsBase.h
index e0af1043a..d16c2bcc4 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -207,7 +207,7 @@ class NodeReporter {
    * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE 
manifest
    * @return the agent manifest response node
    */
-  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() 
const = 0;
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() = 
0;
 };
 
 /**
diff --git a/libminifi/include/properties/Configuration.h 
b/libminifi/include/properties/Configuration.h
index d0b7343ae..f7db69cd4 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -107,6 +107,7 @@ class Configuration : public Properties {
   static constexpr const char *nifi_c2_agent_coap_port = 
"nifi.c2.agent.coap.port";
   static constexpr const char *nifi_c2_agent_protocol_class = 
"nifi.c2.agent.protocol.class";
   static constexpr const char *nifi_c2_agent_identifier = 
"nifi.c2.agent.identifier";
+  static constexpr const char *nifi_c2_agent_identifier_fallback = 
"nifi.c2.agent.identifier.fallback";
   static constexpr const char *nifi_c2_agent_trigger_classes = 
"nifi.c2.agent.trigger.classes";
   static constexpr const char *nifi_c2_root_classes = "nifi.c2.root.classes";
   static constexpr const char *nifi_c2_root_class_definitions = 
"nifi.c2.root.class.definitions";
diff --git a/extensions/systemd/WorkerThread.h 
b/libminifi/include/utils/FifoExecutor.h
similarity index 81%
rename from extensions/systemd/WorkerThread.h
rename to libminifi/include/utils/FifoExecutor.h
index 503fcb1b4..1aca59d56 100644
--- a/extensions/systemd/WorkerThread.h
+++ b/libminifi/include/utils/FifoExecutor.h
@@ -23,7 +23,7 @@
 
 #include "utils/MinifiConcurrentQueue.h"
 
-namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
+namespace org::apache::nifi::minifi::utils {
 
 namespace detail {
 class WorkerThread final {
@@ -48,9 +48,9 @@ class WorkerThread final {
 }  // namespace detail
 
 /**
- * A worker that executes arbitrary functions with no parameters 
asynchronously on an internal thread, returning a future to the result.
+ * Executes arbitrary functions with no parameters asynchronously on an 
internal thread, returning a future to the result.
  */
-class Worker final {
+class FifoExecutor final {
  public:
   template<typename Func>
   auto enqueue(Func func) -> std::future<decltype(func())> {
@@ -64,9 +64,4 @@ class Worker final {
   detail::WorkerThread worker_thread_;
 };
 
-}  // namespace systemd
-}  // namespace extensions
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/SmallString.h 
b/libminifi/include/utils/SmallString.h
index 94259c197..31c149e32 100644
--- a/libminifi/include/utils/SmallString.h
+++ b/libminifi/include/utils/SmallString.h
@@ -34,6 +34,10 @@ class SmallString : public std::array<char, N + 1> {
     return {c_str()};
   }
 
+  [[nodiscard]] std::string_view view() const noexcept {
+    return std::string_view{this->data(), N};
+  }
+
   constexpr size_t length() const noexcept {
     return N;
   }
diff --git a/libminifi/include/utils/file/FileUtils.h 
b/libminifi/include/utils/file/FileUtils.h
index 75ad4e833..c4b1b68cb 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -607,12 +607,14 @@ inline std::error_code hide_file(const char* const 
file_name) {
 
 uint64_t computeChecksum(const std::string &file_name, uint64_t 
up_to_position);
 
-inline std::string get_file_content(const std::string &file_name) {
-  std::ifstream file(file_name);
+inline std::string get_content(const std::string &file_name) {
+  std::ifstream file(file_name, std::ifstream::binary);
   std::string content((std::istreambuf_iterator<char>(file)), 
std::istreambuf_iterator<char>());
   return content;
 }
 
+void put_content(const std::filesystem::path& filename, std::string_view 
new_contents);
+
 bool contains(const std::filesystem::path& file_path, std::string_view 
text_to_search);
 
 
diff --git a/main/AgentDocs.h b/libminifi/include/utils/meta/type_list.h
similarity index 59%
copy from main/AgentDocs.h
copy to libminifi/include/utils/meta/type_list.h
index 0402adf11..a94899abd 100644
--- a/main/AgentDocs.h
+++ b/libminifi/include/utils/meta/type_list.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,30 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef MAIN_AGENTDOCS_H_
-#define MAIN_AGENTDOCS_H_
-
-#include <iostream>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace docs {
+#pragma once
+#include <type_traits>
 
-class AgentDocs {
- public:
-  AgentDocs() = default;
-  ~AgentDocs() = default;
-  void generate(const std::string &docsdir, std::ostream &genStream);
- private:
-  inline std::string extractClassName(const std::string &processor) const;
+namespace org::apache::nifi::minifi::utils::meta {
+template<typename... Types>
+struct type_list {
+  template<typename T>
+  [[nodiscard]] constexpr static bool contains() noexcept {
+    return (std::is_same_v<T, Types> || ...);
+  }
 };
-
-} /* namespace docs */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif  // MAIN_AGENTDOCS_H_
+}  // namespace org::apache::nifi::minifi::utils::meta
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 78e3e36af..4e50b2447 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -83,6 +83,7 @@ const std::vector<core::ConfigurationProperty> 
Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_c2_agent_coap_port, 
gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_c2_agent_protocol_class},
   core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier},
+  
core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier_fallback},
   core::ConfigurationProperty{Configuration::nifi_c2_agent_trigger_classes},
   core::ConfigurationProperty{Configuration::nifi_c2_root_classes},
   core::ConfigurationProperty{Configuration::nifi_c2_root_class_definitions},
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index b6fe4cc3e..ef76b9f8c 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -45,18 +45,16 @@
 #include "io/NetworkPrioritizer.h"
 #include "io/FileStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 FlowController::FlowController(std::shared_ptr<core::Repository> 
provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
                                std::shared_ptr<Configure> configure, 
std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                               std::shared_ptr<core::ContentRepository> 
content_repo, const std::string /*name*/,
-                               std::shared_ptr<utils::file::FileSystem> 
filesystem)
+                               std::shared_ptr<core::ContentRepository> 
content_repo, const std::string& /*name*/,
+                               std::shared_ptr<utils::file::FileSystem> 
filesystem, std::function<void()> request_restart)
     : 
core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()),
       c2::C2Client(std::move(configure), std::move(provenance_repo), 
std::move(flow_file_repo),
-                   std::move(content_repo), std::move(flow_configuration), 
std::move(filesystem)),
+                   std::move(content_repo), std::move(flow_configuration), 
std::move(filesystem),
+                   std::move(request_restart), 
core::logging::LoggerFactory<c2::C2Client>::getLogger()),
       running_(false),
       updating_(false),
       initialized_(false),
@@ -76,9 +74,10 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
 
 FlowController::FlowController(std::shared_ptr<core::Repository> 
provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, 
std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                 std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<utils::file::FileSystem> filesystem)
+                 std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<utils::file::FileSystem> filesystem,
+                 std::function<void()> request_restart)
       : FlowController(std::move(provenance_repo), std::move(flow_file_repo), 
std::move(configure), std::move(flow_configuration),
-                       std::move(content_repo), DEFAULT_ROOT_GROUP_NAME, 
std::move(filesystem)) {}
+                       std::move(content_repo), DEFAULT_ROOT_GROUP_NAME, 
std::move(filesystem), std::move(request_restart)) {}
 
 std::optional<std::chrono::milliseconds> 
FlowController::loadShutdownTimeoutFromConfiguration() {
   std::string shutdown_timeout_str;
@@ -426,13 +425,14 @@ int16_t FlowController::clearConnection(const std::string 
&connection) {
   return -1;
 }
 
-std::shared_ptr<state::response::ResponseNode> 
FlowController::getAgentManifest() const {
+std::shared_ptr<state::response::ResponseNode> 
FlowController::getAgentManifest() {
   auto agentInfo = 
std::make_shared<state::response::AgentInformation>("agentInfo");
   
agentInfo->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(getControllerService(c2::C2Agent::UPDATE_NAME)).get());
   agentInfo->setAgentIdentificationProvider(configuration_);
   agentInfo->setConfigurationReader([this](const std::string& key){
     return configuration_->getString(key);
   });
+  agentInfo->setStateMonitor(this);
   agentInfo->includeAgentStatus(false);
   return agentInfo;
 }
@@ -552,7 +552,4 @@ state::StateController* 
FlowController::getProcessorController(const std::string
   return foundController.get();
 }
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 4d1d55df6..dbc349ea2 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -47,27 +47,25 @@
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 C2Agent::C2Agent(core::controller::ControllerServiceProvider *controller,
                  state::Pausable *pause_handler,
                  state::StateMonitor* updateSink,
-                 const std::shared_ptr<Configure> &configuration,
-                 const std::shared_ptr<utils::file::FileSystem> &filesystem)
+                 std::shared_ptr<Configure> configuration,
+                 std::shared_ptr<utils::file::FileSystem> filesystem,
+                 std::function<void()> request_restart)
     : heart_beat_period_(3s),
       max_c2_responses(5),
       update_sink_(updateSink),
       update_service_(nullptr),
       controller_(controller),
       pause_handler_(pause_handler),
-      configuration_(configuration),
-      filesystem_(filesystem),
+      configuration_(std::move(configuration)),
+      filesystem_(std::move(filesystem)),
       protocol_(nullptr),
-      thread_pool_(2, false, nullptr, "C2 threadpool") {
+      thread_pool_(2, false, nullptr, "C2 threadpool"),
+      request_restart_(std::move(request_restart)) {
   manifest_sent_ = false;
 
   last_run_ = std::chrono::steady_clock::now();
@@ -80,7 +78,7 @@ C2Agent::C2Agent(core::controller::ControllerServiceProvider 
*controller,
     // create a stubbed service for updating the flow identifier
   }
 
-  configure(configuration, false);
+  configure(configuration_, false);
 
   functions_.emplace_back([this] {return produce();});
   functions_.emplace_back([this] {return consume();});
@@ -347,7 +345,7 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse &resp) {
       update_sink_->stop();
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
       protocol_.load()->consumePayload(std::move(response));
-      restart_agent();
+      restart_needed_ = true;
     }
       break;
     case Operation::START:
@@ -634,6 +632,7 @@ void C2Agent::handlePropertyUpdate(const C2ContentResponse 
&resp) {
   }
   C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true);
   enqueue_c2_response(std::move(response));
+  if (result != state::UpdateState::NO_OPERATION) { restart_needed_ = true; }
 }
 
 /**
@@ -715,19 +714,6 @@ void C2Agent::handle_transfer(const C2ContentResponse 
&resp) {
   }
 }
 
-void C2Agent::restart_agent() {
-  std::string cwd = utils::Environment::getCurrentWorkingDirectory();
-  if (cwd.empty()) {
-    logger_->log_error("Could not restart the agent because the working 
directory could not be determined");
-    return;
-  }
-
-  std::string command = cwd + "/bin/minifi.sh restart";
-  if (system(command.c_str()) != 0) {
-    logger_->log_error("System command '%s' failed", command);
-  }
-}
-
 utils::TaskRescheduleInfo C2Agent::produce() {
   // place priority on messages to send to the c2 server
   if (protocol_.load() != nullptr) {
@@ -755,6 +741,12 @@ utils::TaskRescheduleInfo C2Agent::produce() {
           }
         });
 
+    if (restart_needed_ && requests.empty()) {
+      configuration_->commitChanges();
+      request_restart_();
+      return utils::TaskRescheduleInfo::Done();
+    }
+
     try {
       performHeartBeat();
     }
@@ -911,8 +903,4 @@ void C2Agent::enqueue_c2_server_response(C2Payload &&resp) {
   responses.enqueue(std::move(resp));
 }
 
-}  // namespace c2
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 14009c5b2..a4f7dd887 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include <filesystem>
 #include <memory>
 #include <map>
 #include "c2/C2Client.h"
@@ -30,22 +31,20 @@
 #include "c2/C2Agent.h"
 #include "core/state/nodes/FlowInformation.h"
 #include "utils/file/FileSystem.h"
+#include "utils/file/FileUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 C2Client::C2Client(
     std::shared_ptr<Configure> configuration, 
std::shared_ptr<core::Repository> provenance_repo,
     std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::ContentRepository> content_repo,
     std::unique_ptr<core::FlowConfiguration> flow_configuration, 
std::shared_ptr<utils::file::FileSystem> filesystem,
-    std::shared_ptr<core::logging::Logger> logger)
+    std::function<void()> request_restart, 
std::shared_ptr<core::logging::Logger> logger)
     : core::Flow(std::move(provenance_repo), std::move(flow_file_repo), 
std::move(content_repo), std::move(flow_configuration)),
       configuration_(std::move(configuration)),
       filesystem_(std::move(filesystem)),
-      logger_(std::move(logger)) {}
+      logger_(std::move(logger)),
+      request_restart_(std::move(request_restart)) {}
 
 void C2Client::stopC2() {
   if (c2_agent_) {
@@ -68,7 +67,14 @@ void 
C2Client::initialize(core::controller::ControllerServiceProvider *controlle
     logger_->log_info("Agent class is not predefined");
   }
 
-  configuration_->setFallbackAgentIdentifier(getControllerUUID().to_string());
+  // Set a persistent fallback agent id. This is needed so that the C2 server 
can identify the same agent after a restart, even if nifi.c2.agent.identifier 
is not specified.
+  if (auto id = 
configuration_->get(Configuration::nifi_c2_agent_identifier_fallback)) {
+    configuration_->setFallbackAgentIdentifier(*id);
+  } else {
+    const auto agent_id = getControllerUUID().to_string();
+    configuration_->setFallbackAgentIdentifier(agent_id);
+    configuration_->set(Configuration::nifi_c2_agent_identifier_fallback, 
agent_id, PropertyChangeLifetime::PERSISTENT);
+  }
 
   {
     std::lock_guard<std::mutex> lock(initialization_mutex_);
@@ -141,7 +147,7 @@ void 
C2Client::initialize(core::controller::ControllerServiceProvider *controlle
   if (!initialized_) {
     // C2Agent is initialized once, meaning that a C2-triggered 
flow/configuration update
     // might not be equal to a fresh restart
-    c2_agent_ = std::make_unique<c2::C2Agent>(controller, pause_handler, 
update_sink, configuration_, filesystem_);
+    c2_agent_ = std::make_unique<c2::C2Agent>(controller, pause_handler, 
update_sink, configuration_, filesystem_, request_restart_);
     c2_agent_->start();
     initialized_ = true;
   }
@@ -355,8 +361,4 @@ void C2Client::updateResponseNodeConnections() {
   }
 }
 
-}  // namespace c2
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/src/core/state/Value.cpp 
b/libminifi/src/core/state/Value.cpp
index 49cc8fda7..120b228a8 100644
--- a/libminifi/src/core/state/Value.cpp
+++ b/libminifi/src/core/state/Value.cpp
@@ -20,13 +20,11 @@
 #include <openssl/sha.h>
 #include <utility>
 #include <string>
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-namespace response {
+namespace org::apache::nifi::minifi::state::response {
 
 const std::type_index Value::UINT64_TYPE = std::type_index(typeid(uint64_t));
 const std::type_index Value::INT64_TYPE = std::type_index(typeid(int64_t));
@@ -58,10 +56,36 @@ std::string hashResponseNodes(const 
std::vector<SerializedResponseNode>& nodes)
   return utils::StringUtils::to_hex(digest, true /*uppercase*/);
 }
 
-} /* namespace response */
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+namespace {
+rapidjson::Value nodeToJson(const SerializedResponseNode& node, 
rapidjson::MemoryPoolAllocator<rapidjson::CrtAllocator>& alloc) {
+  if (node.value.empty()) {
+    if (node.array) {
+      rapidjson::Value result(rapidjson::kArrayType);
+      for (const auto& elem: node.children) {
+        result.PushBack(nodeToJson(elem, alloc), alloc);
+      }
+      return result;
+    } else {
+      rapidjson::Value result(rapidjson::kObjectType);
+      for (const auto& elem: node.children) {
+        result.AddMember(rapidjson::Value(elem.name.c_str(), alloc), 
nodeToJson(elem, alloc), alloc);
+      }
+      return result;
+    }
+  } else {
+    return rapidjson::Value(node.value.to_string().c_str(), alloc);
+  }
+}
+}  // namespace
+
+std::string SerializedResponseNode::to_string() const {
+  rapidjson::Document doc;
+  doc.SetObject();
+  doc.AddMember(rapidjson::Value(name.c_str(), doc.GetAllocator()), 
nodeToJson(*this, doc.GetAllocator()), doc.GetAllocator());
+  rapidjson::StringBuffer buf;
+  rapidjson::Writer<rapidjson::StringBuffer> writer{buf};
+  doc.Accept(writer);
+  return buf.GetString();
+}
+}  // namespace org::apache::nifi::minifi::state::response
 
diff --git a/extensions/systemd/WorkerThread.cpp 
b/libminifi/src/utils/FifoExecutor.cpp
similarity index 77%
rename from extensions/systemd/WorkerThread.cpp
rename to libminifi/src/utils/FifoExecutor.cpp
index 4213d29f8..f7bf0a807 100644
--- a/extensions/systemd/WorkerThread.cpp
+++ b/libminifi/src/utils/FifoExecutor.cpp
@@ -15,11 +15,9 @@
  * limitations under the License.
  */
 
-#include "WorkerThread.h"
+#include "utils/FifoExecutor.h"
 
-namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
-
-namespace detail {
+namespace org::apache::nifi::minifi::utils::detail {
 WorkerThread::WorkerThread()
     : thread_{&WorkerThread::run, this} {}
 
@@ -33,11 +31,4 @@ void WorkerThread::run() noexcept {
     task_queue_.consumeWait([](std::packaged_task<void()>&& f) { f(); });
   }
 }
-}  // namespace detail
-
-}  // namespace systemd
-}  // namespace extensions
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils::detail
diff --git a/libminifi/src/utils/file/FileUtils.cpp 
b/libminifi/src/utils/file/FileUtils.cpp
index 456211130..2dc6f41f8 100644
--- a/libminifi/src/utils/file/FileUtils.cpp
+++ b/libminifi/src/utils/file/FileUtils.cpp
@@ -106,6 +106,13 @@ std::chrono::time_point<std::chrono::system_clock> 
to_sys_time_point(const std::
 #endif
 }
 
+void put_content(const std::filesystem::path& filename, std::string_view 
new_contents) {
+  std::ofstream ofs;
+  ofs.exceptions(std::ofstream::badbit | std::ofstream::failbit);
+  ofs.open(filename, std::ofstream::binary);
+  ofs.write(new_contents.data(), 
gsl::narrow<std::streamsize>(new_contents.size()));
+}
+
 }  // namespace file
 }  // namespace utils
 }  // namespace minifi
diff --git a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp 
b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
index 844f33d0a..06999e321 100644
--- a/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
+++ b/libminifi/test/aws-tests/FetchS3ObjectTests.cpp
@@ -26,7 +26,7 @@
 namespace {
 
 using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
-using org::apache::nifi::minifi::utils::file::get_file_content;
+using org::apache::nifi::minifi::utils::file::get_content;
 using org::apache::nifi::minifi::utils::file::get_separator;
 
 class FetchS3ObjectTestsFixture : public 
FlowProcessorS3TestsFixture<minifi::aws::processors::FetchS3Object> {
@@ -120,7 +120,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test default 
properties", "[awsS3Co
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), 
"key:s3.expirationTimeRuleId value:" + S3_EXPIRATION_TIME_RULE_ID));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), 
"key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), 
"key:s3.version value:" + S3_VERSION_1));
-  REQUIRE(get_file_content(output_dir + get_separator() + INPUT_FILENAME) == 
S3_CONTENT);
+  REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME) == 
S3_CONTENT);
   
REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetVersionId().empty());
   
REQUIRE(!mock_s3_request_sender_ptr->get_object_request.VersionIdHasBeenSet());
   REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetRequestPayer() == 
Aws::S3::Model::RequestPayer::NOT_SET);
@@ -140,7 +140,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test empty 
optional S3 results", "[
   
REQUIRE(!LogTestController::getInstance().contains("key:s3.expirationTimeRuleId",
 std::chrono::seconds(0), std::chrono::milliseconds(0)));
   REQUIRE(!LogTestController::getInstance().contains("key:s3.sseAlgorithm", 
std::chrono::seconds(0), std::chrono::milliseconds(0)));
   REQUIRE(!LogTestController::getInstance().contains("key:s3.version", 
std::chrono::seconds(0), std::chrono::milliseconds(0)));
-  REQUIRE(get_file_content(output_dir + get_separator() + 
INPUT_FILENAME).empty());
+  REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME).empty());
 }
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test subdirectories on AWS", 
"[awsS3Config]") {
@@ -150,7 +150,7 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test 
subdirectories on AWS", "[awsS
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), 
"key:filename value:logs.txt"));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:path 
value:dir1/dir2"));
   REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), 
"key:absolute.path value:dir1/dir2/logs.txt"));
-  REQUIRE(get_file_content(output_dir + get_separator() + 
INPUT_FILENAME).empty());
+  REQUIRE(get_content(output_dir + get_separator() + INPUT_FILENAME).empty());
 }
 
 TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test optional values are set in 
request", "[awsS3Config]") {
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h 
b/libminifi/test/flow-tests/TestControllerWithFlow.h
index 9841ea8ba..9151c6209 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -60,7 +60,8 @@ class TestControllerWithFlow: public TestController{
     controller_ = std::make_shared<minifi::FlowController>(
         prov_repo, ff_repo, configuration_,
         std::move(flow),
-        content_repo, DEFAULT_ROOT_GROUP_NAME);
+        content_repo, DEFAULT_ROOT_GROUP_NAME,
+        std::make_shared<utils::file::FileSystem>(), []{});
     controller_->load(std::move(root));
   }
 
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index d247bd4bd..1c43656db 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -19,6 +19,7 @@
 
 #define DEFAULT_WAITTIME_MSECS 3000
 
+#include <future>
 #include <memory>
 #include <optional>
 #include <string>
@@ -34,6 +35,7 @@
 #include "core/ConfigurableComponent.h"
 #include "controllers/SSLContextService.h"
 #include "HTTPUtils.h"
+#include "utils/FifoExecutor.h"
 
 namespace minifi = org::apache::nifi::minifi;
 namespace core = minifi::core;
@@ -42,7 +44,30 @@ namespace utils = minifi::utils;
 class IntegrationBase {
  public:
   explicit IntegrationBase(std::chrono::milliseconds waitTime = 
std::chrono::milliseconds(DEFAULT_WAITTIME_MSECS));
-
+  IntegrationBase(const IntegrationBase&) = delete;
+  IntegrationBase(IntegrationBase&& other) noexcept
+      :configuration{std::move(other.configuration)},
+      flowController_{std::move(other.flowController_)},
+      wait_time_{other.wait_time_},
+      port{std::move(other.port)},
+      scheme{std::move(other.scheme)},
+      key_dir{std::move(other.key_dir)},
+      state_dir{std::move(other.state_dir)},
+      restart_requested_count_{other.restart_requested_count_.load()}
+  {}
+  IntegrationBase& operator=(const IntegrationBase&) = delete;
+  IntegrationBase& operator=(IntegrationBase&& other) noexcept {
+    if (&other == this) return *this;
+    configuration = std::move(other.configuration);
+    flowController_ = std::move(other.flowController_);
+    wait_time_ = other.wait_time_;
+    port = std::move(other.port);
+    scheme = std::move(other.scheme);
+    key_dir = std::move(other.key_dir);
+    state_dir = std::move(other.state_dir);
+    restart_requested_count_ = other.restart_requested_count_.load();
+    return *this;
+  }
   virtual ~IntegrationBase() = default;
 
   virtual void run(const std::optional<std::string>& test_file_location = {}, 
const std::optional<std::string>& home_path = {});
@@ -93,6 +118,7 @@ class IntegrationBase {
   std::string port, scheme;
   std::string key_dir;
   std::string state_dir;
+  std::atomic<int> restart_requested_count_{0};
 };
 
 IntegrationBase::IntegrationBase(std::chrono::milliseconds waitTime)
@@ -111,6 +137,7 @@ void IntegrationBase::configureSecurity() {
 }
 
 void IntegrationBase::run(const std::optional<std::string>& 
test_file_location, const std::optional<std::string>& home_path) {
+  using namespace std::literals::chrono_literals;
   testSetup();
 
   std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
@@ -126,46 +153,67 @@ void IntegrationBase::run(const 
std::optional<std::string>& test_file_location,
 
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
-
-  bool should_encrypt_flow_config = 
(configuration->get(minifi::Configure::nifi_flow_configuration_encrypt)
-                                     | 
utils::flatMap(utils::StringUtils::toBool)).value_or(false);
 
-  std::shared_ptr<utils::file::FileSystem> filesystem;
-  if (home_path) {
-    filesystem = std::make_shared<utils::file::FileSystem>(
-        should_encrypt_flow_config,
-        utils::crypto::EncryptionProvider::create(*home_path));
-  } else {
-    filesystem = std::make_shared<utils::file::FileSystem>();
-  }
-
-  std::unique_ptr<core::FlowConfiguration> flow_config = 
std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location, filesystem));
+  std::atomic<bool> running = true;
+  utils::FifoExecutor assertion_runner;
+  std::future<void> assertions_done;
+  while (running) {
+    running = false;  // Stop running after this iteration, unless restart is 
explicitly requested
 
-  auto controller_service_provider = 
flow_config->getControllerServiceProvider();
-  char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
-  state_dir = utils::file::create_temp_directory(state_dir_name_template);
-  if 
(!configuration->get(minifi::Configure::nifi_state_management_provider_local_path))
 {
-    
configuration->set(minifi::Configure::nifi_state_management_provider_local_path,
 state_dir);
-  }
-  
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(),
 configuration);
+    std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
 
-  std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
-  queryRootProcessGroup(pg);
+    bool should_encrypt_flow_config = 
(configuration->get(minifi::Configure::nifi_flow_configuration_encrypt)
+        | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
 
-  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
+    std::shared_ptr<utils::file::FileSystem> filesystem;
+    if (home_path) {
+      filesystem = std::make_shared<utils::file::FileSystem>(
+          should_encrypt_flow_config,
+          utils::crypto::EncryptionProvider::create(*home_path));
+    } else {
+      filesystem = std::make_shared<utils::file::FileSystem>();
+    }
 
-  flowController_ = std::make_unique<minifi::FlowController>(test_repo, 
test_flow_repo, configuration, std::move(flow_config), content_repo, 
DEFAULT_ROOT_GROUP_NAME);
-  flowController_->load();
-  updateProperties(*flowController_);
-  flowController_->start();
+    auto flow_config = std::make_unique<core::YamlConfiguration>(test_repo, 
test_repo, content_repo, stream_factory, configuration, test_file_location, 
filesystem);
 
-  runAssertions();
+    auto controller_service_provider = 
flow_config->getControllerServiceProvider();
+    char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
+    state_dir = utils::file::create_temp_directory(state_dir_name_template);
+    if 
(!configuration->get(minifi::Configure::nifi_state_management_provider_local_path))
 {
+      
configuration->set(minifi::Configure::nifi_state_management_provider_local_path,
 state_dir);
+    }
+    
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(),
 configuration);
+
+    std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
+    queryRootProcessGroup(pg);
+
+    std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
+
+    const auto request_restart = [&, this] {
+      ++restart_requested_count_;
+      running = true;
+    };
+    flowController_ = std::make_unique<minifi::FlowController>(test_repo, 
test_flow_repo, configuration, std::move(flow_config), content_repo, 
DEFAULT_ROOT_GROUP_NAME,
+        std::make_shared<utils::file::FileSystem>(), request_restart);
+    flowController_->load();
+    updateProperties(*flowController_);
+    flowController_->start();
+
+    assertions_done = assertion_runner.enqueue([this] { runAssertions(); });
+    std::future_status status = std::future_status::ready;
+    while (!running && (status = assertions_done.wait_for(10ms)) == 
std::future_status::timeout) { /* wait */ }
+    if (running && status != std::future_status::timeout) {
+      // cancel restart, because assertions have finished running
+      running = false;
+    }
 
-  shutdownBeforeFlowController();
-  flowController_->unload();
-  flowController_->stopC2();
+    if (!running) {
+      // Only stop servers if we're shutting down
+      shutdownBeforeFlowController();
+    }
+    flowController_->unload();
+    flowController_->stopC2();
+  }
 
   cleanup();
 }
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp 
b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 0a78b2fd5..80aad7d0e 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -20,17 +20,11 @@
 #undef NDEBUG
 #include <cassert>
 #include <chrono>
-#include <fstream>
-#include <utility>
 #include <memory>
+#include <utility>
 #include <string>
 #include <thread>
-#include <type_traits>
-#include <vector>
 #include "utils/file/FileUtils.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
 #include "core/ProcessGroup.h"
 #include "core/yaml/YamlConfiguration.h"
 #include "FlowController.h"
@@ -38,7 +32,6 @@
 #include "../unit/ProvenanceTestHelper.h"
 #include "io/StreamFactory.h"
 #include "../TestBase.h"
-#include "../Catch.h"
 #include "utils/IntegrationTestUtils.h"
 
 int main(int argc, char **argv) {
@@ -63,12 +56,13 @@ int main(int argc, char **argv) {
   configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
   std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::make_unique<core::YamlConfiguration>(
+      test_repo, test_repo, content_repo, stream_factory, configuration, 
test_file_location);
   std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
 
-  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(
-      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
content_repo, DEFAULT_ROOT_GROUP_NAME);
+  const auto controller = std::make_shared<minifi::FlowController>(
+      test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
content_repo, DEFAULT_ROOT_GROUP_NAME,
+      std::make_shared<utils::file::FileSystem>(), []{});
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
 
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp 
b/libminifi/test/persistence-tests/PersistenceTests.cpp
index d28011596..276a06445 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -176,7 +176,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
 
   auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, 
ff_repository, content_repo, nullptr, config, "");
   auto flowController = std::make_shared<minifi::FlowController>(
-      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, 
"");
+      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, 
"", std::make_shared<utils::file::FileSystem>(), []{});
 
   {
     TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, 
MergeContent::Merge);
@@ -290,7 +290,7 @@ TEST_CASE("Persisted flowFiles are updated on 
modification", "[TestP1]") {
 
   auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, 
ff_repository, content_repo, nullptr, config, "");
   auto flowController = std::make_shared<minifi::FlowController>(
-      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, 
"");
+      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, 
"", std::make_shared<utils::file::FileSystem>(), []{});
 
   {
     TestFlow flow(ff_repository, content_repo, prov_repo, 
setupContentUpdaterProcessor, {"success", "d"});
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp 
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 3ca9f950b..5e96aec40 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -290,7 +290,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, 
ff_repository, content_repo, nullptr, config, "");
   auto flowController = std::make_shared<minifi::FlowController>(
-      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, 
"");
+      prov_repo, ff_repository, config, std::move(flowConfig), content_repo, 
"", std::make_shared<utils::file::FileSystem>(), []{});
 
   std::string data = "banana";
   minifi::io::BufferStream content(data);
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index 760904bd8..65c91bf1f 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -244,7 +244,8 @@ class TestFlowController : public 
org::apache::nifi::minifi::FlowController {
   
TestFlowController(std::shared_ptr<org::apache::nifi::minifi::core::Repository> 
repo, std::shared_ptr<org::apache::nifi::minifi::core::Repository> 
flow_file_repo,
       const 
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& 
/*content_repo*/)
       :org::apache::nifi::minifi::FlowController(repo, flow_file_repo, 
std::make_shared<org::apache::nifi::minifi::Configure>(), nullptr,
-          
std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(),
 "") {
+          
std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>(),
 "",
+          
std::make_shared<org::apache::nifi::minifi::utils::file::FileSystem>(), []{}) {
   }
 
   ~TestFlowController() override = default;
diff --git a/main/AgentDocs.cpp b/main/AgentDocs.cpp
index 222e76a2e..bd5b834af 100644
--- a/main/AgentDocs.cpp
+++ b/main/AgentDocs.cpp
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -34,18 +33,13 @@
 #include "core/Relationship.h"
 #include "io/validation.h"
 #include "utils/file/FileUtils.h"
-#include "agent/build_description.h"
 #include "agent/agent_docs.h"
 #include "agent/agent_version.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace docs {
+namespace org::apache::nifi::minifi::docs {
 
 std::string AgentDocs::extractClassName(const std::string &processor) const {
-  auto positionOfLastDot = processor.find_last_of(".");
+  auto positionOfLastDot = processor.find_last_of('.');
   if (positionOfLastDot != std::string::npos) {
     return processor.substr(positionOfLastDot + 1);
   }
@@ -55,7 +49,7 @@ std::string AgentDocs::extractClassName(const std::string 
&processor) const {
 void AgentDocs::generate(const std::string &docsdir, std::ostream &genStream) {
   std::map<std::string, ClassDescription> processorSet;
   for (const auto &group : minifi::AgentBuild::getExtensions()) {
-    struct Components descriptions = 
BuildDescription::getClassDescriptions(group);
+    struct Components descriptions = 
build_description_.getClassDescriptions(group);
     for (const auto &processorName : descriptions.processors_) {
       
processorSet.insert(std::make_pair(extractClassName(processorName.class_name_), 
processorName));
     }
@@ -64,18 +58,18 @@ void AgentDocs::generate(const std::string &docsdir, 
std::ostream &genStream) {
     const std::string &filename = docsdir + utils::file::get_separator() + 
processor.first;
     std::ofstream outfile(filename);
 
-    std::string description;
-
-    bool foundDescription = minifi::AgentDocs::getDescription(processor.first, 
description);
-
-    if (!foundDescription) {
-      foundDescription = 
minifi::AgentDocs::getDescription(processor.second.class_name_, description);
-    }
+    {
+      std::string description;
+      bool foundDescription = 
minifi::AgentDocs::getDescription(processor.first, description);
+      if (!foundDescription) {
+        foundDescription = 
minifi::AgentDocs::getDescription(processor.second.class_name_, description);
+      }
 
-    outfile << "## " << processor.first << std::endl << std::endl;
-    if (foundDescription) {
-      outfile << "### Description " << std::endl << std::endl;
-      outfile << description << std::endl;
+      outfile << "## " << processor.first << std::endl << std::endl;
+      if (foundDescription) {
+        outfile << "### Description " << std::endl << std::endl;
+        outfile << description << std::endl;
+      }
     }
 
     outfile << "### Properties " << std::endl << std::endl;
@@ -165,8 +159,4 @@ void AgentDocs::generate(const std::string &docsdir, 
std::ostream &genStream) {
   }
 }
 
-} /* namespace docs */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::docs
diff --git a/main/AgentDocs.h b/main/AgentDocs.h
index 0402adf11..1b880b3eb 100644
--- a/main/AgentDocs.h
+++ b/main/AgentDocs.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,26 +18,18 @@
 #define MAIN_AGENTDOCS_H_
 
 #include <iostream>
+#include "agent/build_description.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace docs {
+namespace org::apache::nifi::minifi::docs {
 
 class AgentDocs {
  public:
-  AgentDocs() = default;
-  ~AgentDocs() = default;
   void generate(const std::string &docsdir, std::ostream &genStream);
  private:
-  inline std::string extractClassName(const std::string &processor) const;
+  [[nodiscard]] inline std::string extractClassName(const std::string 
&processor) const;
+  BuildDescription build_description_;
 };
 
-} /* namespace docs */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::docs
 
 #endif  // MAIN_AGENTDOCS_H_
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index e90b8665d..70afd8085 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -43,6 +43,7 @@
 #include <signal.h>
 #include <sodium.h>
 
+#include <atomic>
 #include <cstdlib>
 #include <iostream>
 #include <memory>
@@ -68,8 +69,8 @@ namespace core = minifi::core;
 namespace utils = minifi::utils;
 
  // Variables that allow us to avoid a timed wait.
-sem_t *running;
-//! Flow Controller
+static sem_t *flow_controller_running;
+static sem_t *process_running;
 
 /**
  * Removed the stop command from the signal handler so that we could trigger
@@ -83,24 +84,27 @@ sem_t *running;
 
 #ifdef WIN32
 BOOL WINAPI consoleSignalHandler(DWORD signal) {
+  if (!process_running) { exit(0); return TRUE; }
   if (signal == CTRL_C_EVENT || signal == CTRL_BREAK_EVENT) {
-    sem_post(running);
-    if (sem_wait(running) == -1)
-      perror("sem_wait");
+    int ret = ETIMEDOUT;
+    while (ret == ETIMEDOUT) {
+      if (flow_controller_running) { sem_post(flow_controller_running); }
+      const struct timespec timeout_100ms { .tv_sec = 0, .tv_nsec = 100000000};
+      ret = sem_timedwait(process_running, &timeout_100ms);
+    }
+    return TRUE;
   }
-
-  return TRUE;
+  return FALSE;
 }
 
 void SignalExitProcess() {
-  sem_post(running);
+  sem_post(flow_controller_running);
 }
 #endif
 
 void sigHandler(int signal) {
   if (signal == SIGINT || signal == SIGTERM) {
-    // avoid stopping the controller here.
-    sem_post(running);
+    sem_post(flow_controller_running);
   }
 }
 
@@ -148,20 +152,6 @@ int main(int argc, char **argv) {
     return -1;
   }
 
-  uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
-
-  std::string graceful_shutdown_seconds;
-  std::string prov_repo_class = "provenancerepository";
-  std::string flow_repo_class = "flowfilerepository";
-  std::string nifi_configuration_class_name = "yamlconfiguration";
-  std::string content_repo_class = "filesystemrepository";
-
-  running = sem_open("/MiNiFiMain", O_CREAT, 0644, 0);
-  if (running == SEM_FAILED || running == 0) {
-    logger->log_error("could not initialize semaphore");
-    perror("initialization failure");
-  }
-
 #ifdef WIN32
   if (!SetConsoleCtrlHandler(consoleSignalHandler, TRUE)) {
     logger->log_error("Cannot install signal handler");
@@ -184,237 +174,274 @@ int main(int argc, char **argv) {
     return -1;
   }
 #endif
-
   // Determine MINIFI_HOME
   const std::string minifiHome = determineMinifiHome(logger);
   if (minifiHome.empty()) {
     // determineMinifiHome already logged everything we need
     return -1;
   }
-
   // chdir to MINIFI_HOME
   if (!utils::Environment::setCurrentWorkingDirectory(minifiHome.c_str())) {
     logger->log_error("Failed to change working directory to MINIFI_HOME 
(%s)", minifiHome);
     return -1;
   }
+  const auto flow_controller_semaphore_path = "/MiNiFiMain";
+  const auto process_semaphore_path = "/MiNiFiProc";
 
-  const auto log_properties = 
std::make_shared<core::logging::LoggerProperties>();
-  log_properties->setHome(minifiHome);
-  log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
-  
core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
-
-  std::shared_ptr<minifi::Properties> uid_properties = 
std::make_shared<minifi::Properties>("UID properties");
-  uid_properties->setHome(minifiHome);
-  uid_properties->loadConfigureFile(DEFAULT_UID_PROPERTIES_FILE);
-  utils::IdGenerator::getIdGenerator()->initialize(uid_properties);
-
-  // Make a record of minifi home in the configured log file.
-  logger->log_info("MINIFI_HOME=%s", minifiHome);
-
-  auto decryptor = minifi::Decryptor::create(minifiHome);
-  if (decryptor) {
-    logger->log_info("Found encryption key, will decrypt sensitive properties 
in the configuration");
-  } else {
-    logger->log_info("No encryption key found, will not decrypt sensitive 
properties in the configuration");
+  process_running = sem_open(process_semaphore_path, O_CREAT, 0644, 0);
+  if (process_running == SEM_FAILED) {
+    logger->log_error("could not initialize process semaphore");
+    perror("sem_open");
+    return -1;
   }
 
-  const std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>(std::move(decryptor), 
std::move(log_properties));
-  configure->setHome(minifiHome);
-  configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
-
-  minifi::core::extension::ExtensionManager::get().initialize(configure);
-
-  if (argc >= 3 && std::string("docs") == argv[1]) {
-    if (utils::file::create_dir(argv[2]) != 0) {
-      std::cerr << "Working directory doesn't exist and cannot be created: " 
<< argv[2] << std::endl;
-      exit(1);
-    }
-
-    std::cerr << "Dumping docs to " << argv[2] << std::endl;
-    if (argc == 4) {
-      std::string filepath, filename;
-      utils::file::PathUtils::getFileNameAndPath(argv[3], filepath, filename);
-      if (filepath == argv[2]) {
-        std::cerr << "Target file should be out of the working directory: " << 
filepath << std::endl;
-        exit(1);
-      }
-      std::ofstream outref(argv[3]);
-      dumpDocs(configure, argv[2], outref);
-    } else {
-      dumpDocs(configure, argv[2], std::cout);
+  std::atomic<bool> restart_token{false};
+  const auto request_restart = [&] {
+    if (!restart_token.exchange(true)) {
+      // only do sem_post if a restart is not already in progress (the flag 
was unset before the exchange)
+      sem_post(flow_controller_running);
+      logger->log_info("Initiating restart...");
     }
-    exit(0);
-  }
-
+  };
 
-  if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, 
graceful_shutdown_seconds)) {
-    try {
-      stop_wait_time = std::stoi(graceful_shutdown_seconds);
-    }
-    catch (const std::out_of_range &e) {
-      logger->log_error("%s is out of range. %s", 
minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
-    }
-    catch (const std::invalid_argument &e) {
-      logger->log_error("%s contains an invalid argument set. %s", 
minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
+  do {
+    flow_controller_running = sem_open(flow_controller_semaphore_path, 
O_CREAT, 0644, 0);
+    if (flow_controller_running == SEM_FAILED) {
+      logger->log_error("could not initialize flow controller semaphore");
+      perror("sem_open");
+      return -1;
     }
-  }
-  else {
-    logger->log_debug("%s not set, defaulting to %d", 
minifi::Configure::nifi_graceful_shutdown_seconds,
-      STOP_WAIT_TIME_MS);
-  }
-
-  configure->get(minifi::Configure::nifi_provenance_repository_class_name, 
prov_repo_class);
-  // Create repos for flow record and provenance
-  std::shared_ptr<core::Repository> prov_repo = 
core::createRepository(prov_repo_class, true, "provenance");
 
-  if (!prov_repo->initialize(configure)) {
-    logger->log_error("Provenance repository failed to initialize, exiting..");
-    exit(1);
-  }
-
-  configure->get(minifi::Configure::nifi_flow_repository_class_name, 
flow_repo_class);
-
-  std::shared_ptr<core::Repository> flow_repo = 
core::createRepository(flow_repo_class, true, "flowfile");
-
-  if (!flow_repo->initialize(configure)) {
-    logger->log_error("Flow file repository failed to initialize, exiting..");
-    exit(1);
-  }
+    uint16_t stop_wait_time = STOP_WAIT_TIME_MS;
 
-  configure->get(minifi::Configure::nifi_content_repository_class_name, 
content_repo_class);
+    std::string graceful_shutdown_seconds;
+    std::string prov_repo_class = "provenancerepository";
+    std::string flow_repo_class = "flowfilerepository";
+    std::string nifi_configuration_class_name = "yamlconfiguration";
+    std::string content_repo_class = "filesystemrepository";
 
-  std::shared_ptr<core::ContentRepository> content_repo = 
core::createContentRepository(content_repo_class, true, "content");
+    const auto log_properties = 
std::make_shared<core::logging::LoggerProperties>();
+    log_properties->setHome(minifiHome);
+    log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE);
+    
core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties);
 
-  if (!content_repo->initialize(configure)) {
-    logger->log_error("Content repository failed to initialize, exiting..");
-    exit(1);
-  }
-
-  std::string content_repo_path;
-  if 
(configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, 
content_repo_path)) {
-    core::logging::LOG_INFO(logger) << "setting default dir to " << 
content_repo_path;
-    minifi::setDefaultDirectory(content_repo_path);
-  }
+    std::shared_ptr<minifi::Properties> uid_properties = 
std::make_shared<minifi::Properties>("UID properties");
+    uid_properties->setHome(minifiHome);
+    uid_properties->loadConfigureFile(DEFAULT_UID_PROPERTIES_FILE);
+    utils::IdGenerator::getIdGenerator()->initialize(uid_properties);
 
-  configure->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
+    // Make a record of minifi home in the configured log file.
+    logger->log_info("MINIFI_HOME=%s", minifiHome);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configure);
+    auto decryptor = minifi::Decryptor::create(minifiHome);
+    if (decryptor) {
+      logger->log_info("Found encryption key, will decrypt sensitive 
properties in the configuration");
+    } else {
+      logger->log_info("No encryption key found, will not decrypt sensitive 
properties in the configuration");
+    }
 
-  bool should_encrypt_flow_config = 
(configure->get(minifi::Configure::nifi_flow_configuration_encrypt)
-      | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+    const std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>(std::move(decryptor), 
std::move(log_properties));
+    configure->setHome(minifiHome);
+    configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
 
-  auto filesystem = std::make_shared<utils::file::FileSystem>(
-      should_encrypt_flow_config,
-      utils::crypto::EncryptionProvider::create(minifiHome));
+    minifi::core::extension::ExtensionManager::get().initialize(configure);
 
-  std::unique_ptr<core::FlowConfiguration> flow_configuration = 
core::createFlowConfiguration(
-      prov_repo, flow_repo, content_repo, configure, stream_factory, 
nifi_configuration_class_name,
-      configure->get(minifi::Configure::nifi_flow_configuration_file), 
filesystem);
+    if (argc >= 3 && std::string("docs") == argv[1]) {
+      if (utils::file::create_dir(argv[2]) != 0) {
+        std::cerr << "Working directory doesn't exist and cannot be created: " 
<< argv[2] << std::endl;
+        exit(1);
+      }
 
-  const auto controller = std::make_unique<minifi::FlowController>(
-      prov_repo, flow_repo, configure, std::move(flow_configuration), 
content_repo, filesystem);
+      std::cerr << "Dumping docs to " << argv[2] << std::endl;
+      if (argc == 4) {
+        std::string filepath, filename;
+        utils::file::PathUtils::getFileNameAndPath(argv[3], filepath, 
filename);
+        if (filepath == argv[2]) {
+          std::cerr << "Target file should be out of the working directory: " 
<< filepath << std::endl;
+          exit(1);
+        }
+        std::ofstream outref(argv[3]);
+        dumpDocs(configure, argv[2], outref);
+      } else {
+        dumpDocs(configure, argv[2], std::cout);
+      }
+      exit(0);
+    }
 
-  const bool disk_space_watchdog_enable = 
(configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | 
utils::map([](const std::string& v){ return v == "true"; })).value_or(true);
-  std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;
-  if (disk_space_watchdog_enable) {
-    try {
-      const auto repo_paths = [&] {
-        std::vector<std::string> repo_paths;
-        repo_paths.reserve(3);
-        // REPOSITORY_DIRECTORY is a dummy path used by noop repositories
-        const auto path_valid = [](const std::string& p) { return !p.empty() 
&& p != REPOSITORY_DIRECTORY; };
-        auto prov_repo_path = prov_repo->getDirectory();
-        auto flow_repo_path = flow_repo->getDirectory();
-        auto content_repo_storage_path = content_repo->getStoragePath();
-        if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { 
repo_paths.push_back(std::move(prov_repo_path)); }
-        if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { 
repo_paths.push_back(std::move(flow_repo_path)); }
-        if (path_valid(content_repo_storage_path)) { 
repo_paths.push_back(std::move(content_repo_storage_path)); }
-        return repo_paths;
-      }();
-      const auto available_spaces = 
minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get());
-      const auto config = minifi::disk_space_watchdog::read_config(*configure);
-      const auto min_space = [](const std::vector<std::uintmax_t>& spaces) {
-        const auto it = std::min_element(std::begin(spaces), std::end(spaces));
-        return it != spaces.end() ? *it : 
(std::numeric_limits<std::uintmax_t>::max)();
-      };
-      if (min_space(available_spaces) <= config.stop_threshold_bytes) {
-        logger->log_error("Cannot start MiNiFi due to insufficient available 
disk space");
-        return -1;
+    if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, 
graceful_shutdown_seconds)) {
+      try {
+        stop_wait_time = std::stoi(graceful_shutdown_seconds);
       }
-      auto interval_switch = 
minifi::disk_space_watchdog::disk_space_interval_switch(config);
-      disk_space_watchdog = 
std::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, 
min_space, repo_paths, logger, &controller]() mutable {
-        const auto stop = [&]{ controller->stop(); controller->unload(); };
-        const auto restart = [&]{ controller->load(); controller->start(); };
-        const auto switch_state = 
interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths,
 logger.get())));
-        if (switch_state.state == utils::IntervalSwitchState::LOWER && 
switch_state.switched) {
-          logger->log_warn("Stopping flow controller due to insufficient disk 
space");
-          stop();
-        } else if (switch_state.state == utils::IntervalSwitchState::UPPER && 
switch_state.switched) {
-          logger->log_info("Restarting flow controller");
-          restart();
-        }
-      });
-    } catch (const std::runtime_error& error) {
-      logger->log_error(error.what());
-      return -1;
+      catch (const std::out_of_range& e) {
+        logger->log_error("%s is out of range. %s", 
minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
+      }
+      catch (const std::invalid_argument& e) {
+        logger->log_error("%s contains an invalid argument set. %s", 
minifi::Configure::nifi_graceful_shutdown_seconds, e.what());
+      }
+    } else {
+      logger->log_debug("%s not set, defaulting to %d", 
minifi::Configure::nifi_graceful_shutdown_seconds,
+          STOP_WAIT_TIME_MS);
     }
-  }
 
-  logger->log_info("Loading FlowController");
+    configure->get(minifi::Configure::nifi_provenance_repository_class_name, 
prov_repo_class);
+    // Create repos for flow record and provenance
+    std::shared_ptr<core::Repository> prov_repo = 
core::createRepository(prov_repo_class, true, "provenance");
 
-  // Load flow from specified configuration file
-  try {
-    controller->load();
-  }
-  catch (std::exception &e) {
-    logger->log_error("Failed to load configuration due to exception: %s", 
e.what());
-    return -1;
-  }
-  catch (...) {
-    logger->log_error("Failed to load configuration due to unknown exception");
-    return -1;
-  }
+    if (!prov_repo->initialize(configure)) {
+      logger->log_error("Provenance repository failed to initialize, 
exiting..");
+      exit(1);
+    }
 
-  // Start Processing the flow
-  controller->start();
+    configure->get(minifi::Configure::nifi_flow_repository_class_name, 
flow_repo_class);
 
-  if (disk_space_watchdog) { disk_space_watchdog->start(); }
+    std::shared_ptr<core::Repository> flow_repo = 
core::createRepository(flow_repo_class, true, "flowfile");
 
-  logger->log_info("MiNiFi started");
+    if (!flow_repo->initialize(configure)) {
+      logger->log_error("Flow file repository failed to initialize, 
exiting..");
+      exit(1);
+    }
 
-  /**
-   * Sem wait provides us the ability to have a controlled
-   * yield without the need for a more complex construct and
-   * a spin lock
-   */
-  int ret_val;
-  while ((ret_val = sem_wait(running)) == -1 && errno == EINTR);
-  if (ret_val == -1) perror("sem_wait");
+    configure->get(minifi::Configure::nifi_content_repository_class_name, 
content_repo_class);
 
-  while ((ret_val = sem_close(running)) == -1 && errno == EINTR);
-  if (ret_val == -1) perror("sem_close");
+    std::shared_ptr<core::ContentRepository> content_repo = 
core::createContentRepository(content_repo_class, true, "content");
 
-  while ((ret_val = sem_unlink("/MiNiFiMain")) == -1 && errno == EINTR);
-  if (ret_val == -1) perror("sem_unlink");
+    if (!content_repo->initialize(configure)) {
+      logger->log_error("Content repository failed to initialize, exiting..");
+      exit(1);
+    }
 
-  disk_space_watchdog = nullptr;
+    std::string content_repo_path;
+    if 
(configure->get(minifi::Configure::nifi_dbcontent_repository_directory_default, 
content_repo_path)) {
+      core::logging::LOG_INFO(logger) << "setting default dir to " << 
content_repo_path;
+      minifi::setDefaultDirectory(content_repo_path);
+    }
 
-  /**
-   * Trigger unload -- wait stop_wait_time
-   */
-  controller->waitUnload(stop_wait_time);
+    configure->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
+
+    std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configure);
+
+    bool should_encrypt_flow_config = 
(configure->get(minifi::Configure::nifi_flow_configuration_encrypt)
+        | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+
+    auto filesystem = std::make_shared<utils::file::FileSystem>(
+        should_encrypt_flow_config,
+        utils::crypto::EncryptionProvider::create(minifiHome));
+
+    std::unique_ptr<core::FlowConfiguration> flow_configuration = 
core::createFlowConfiguration(
+        prov_repo, flow_repo, content_repo, configure, stream_factory, 
nifi_configuration_class_name,
+        configure->get(minifi::Configure::nifi_flow_configuration_file), 
filesystem);
+
+    const auto controller = std::make_unique<minifi::FlowController>(
+        prov_repo, flow_repo, configure, std::move(flow_configuration), 
content_repo, filesystem, request_restart);
+
+    const bool disk_space_watchdog_enable = 
(configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | 
utils::map([](const std::string& v) { return v == "true"; })).value_or(true);
+    std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;
+    if (disk_space_watchdog_enable) {
+      try {
+        const auto repo_paths = [&] {
+          std::vector<std::string> repo_paths;
+          repo_paths.reserve(3);
+          // REPOSITORY_DIRECTORY is a dummy path used by noop repositories
+          const auto path_valid = [](const std::string& p) { return !p.empty() 
&& p != REPOSITORY_DIRECTORY; };
+          auto prov_repo_path = prov_repo->getDirectory();
+          auto flow_repo_path = flow_repo->getDirectory();
+          auto content_repo_storage_path = content_repo->getStoragePath();
+          if (!prov_repo->isNoop() && path_valid(prov_repo_path)) { 
repo_paths.push_back(std::move(prov_repo_path)); }
+          if (!flow_repo->isNoop() && path_valid(flow_repo_path)) { 
repo_paths.push_back(std::move(flow_repo_path)); }
+          if (path_valid(content_repo_storage_path)) { 
repo_paths.push_back(std::move(content_repo_storage_path)); }
+          return repo_paths;
+        }();
+        const auto available_spaces = 
minifi::disk_space_watchdog::check_available_space(repo_paths, logger.get());
+        const auto config = 
minifi::disk_space_watchdog::read_config(*configure);
+        const auto min_space = [](const std::vector<std::uintmax_t>& spaces) {
+          const auto it = std::min_element(std::begin(spaces), 
std::end(spaces));
+          return it != spaces.end() ? *it : 
(std::numeric_limits<std::uintmax_t>::max)();
+        };
+        if (min_space(available_spaces) <= config.stop_threshold_bytes) {
+          logger->log_error("Cannot start MiNiFi due to insufficient available 
disk space");
+          return -1;
+        }
+        auto interval_switch = 
minifi::disk_space_watchdog::disk_space_interval_switch(config);
+        disk_space_watchdog = 
std::make_unique<utils::CallBackTimer>(config.interval, [interval_switch, 
min_space, repo_paths, logger, &controller]() mutable {
+          const auto stop = [&] {
+            controller->stop();
+            controller->unload();
+          };
+          const auto restart = [&] {
+            controller->load();
+            controller->start();
+          };
+          const auto switch_state = 
interval_switch(min_space(minifi::disk_space_watchdog::check_available_space(repo_paths,
 logger.get())));
+          if (switch_state.state == utils::IntervalSwitchState::LOWER && 
switch_state.switched) {
+            logger->log_warn("Stopping flow controller due to insufficient 
disk space");
+            stop();
+          } else if (switch_state.state == utils::IntervalSwitchState::UPPER 
&& switch_state.switched) {
+            logger->log_info("Restarting flow controller");
+            restart();
+          }
+        });
+      } catch (const std::runtime_error& error) {
+        logger->log_error(error.what());
+        return -1;
+      }
+    }
 
-  controller->stopC2();
+    logger->log_info("Loading FlowController");
 
-  flow_repo = nullptr;
+    // Load flow from specified configuration file
+    try {
+      controller->load();
+    }
+    catch (std::exception& e) {
+      logger->log_error("Failed to load configuration due to exception: %s", 
e.what());
+      return -1;
+    }
+    catch (...) {
+      logger->log_error("Failed to load configuration due to unknown 
exception");
+      return -1;
+    }
 
-  prov_repo = nullptr;
+    // Start Processing the flow
+    controller->start();
+
+    if (disk_space_watchdog) { disk_space_watchdog->start(); }
+
+    logger->log_info("MiNiFi started");
+
+    /**
+     * Sem wait provides us the ability to have a controlled
+     * yield without the need for a more complex construct and
+     * a spin lock
+     */
+    int ret_val;
+    while ((ret_val = sem_wait(flow_controller_running)) == -1 && errno == 
EINTR) {}
+    if (ret_val == -1) perror("sem_wait");
+
+    while ((ret_val = sem_close(flow_controller_running)) == -1 && errno == 
EINTR) {}
+    if (ret_val == -1) perror("sem_close");
+    flow_controller_running = nullptr;
+
+    while ((ret_val = sem_unlink(flow_controller_semaphore_path)) == -1 && 
errno == EINTR) {}
+    if (ret_val == -1) perror("sem_unlink");
+
+    disk_space_watchdog = nullptr;
+
+    /**
+     * Trigger unload -- wait stop_wait_time
+     */
+    controller->waitUnload(stop_wait_time);
+    controller->stopC2();
+    flow_repo = nullptr;
+    prov_repo = nullptr;
+  } while ([&] {
+    const auto restart_token_temp = restart_token.exchange(false);
+    if (restart_token_temp) {
+      logger->log_info("Restarting MiNiFi");
+    }
+    return restart_token_temp;
+  }());
 
+  if (process_running) { sem_post(process_running); }
   logger->log_info("MiNiFi exit");
-
-#ifdef WIN32
-  sem_post(running);
-#endif
-
   return 0;
 }
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp 
b/nanofi/src/cxx/C2CallbackAgent.cpp
index 9a7c755c9..d4af51260 100644
--- a/nanofi/src/cxx/C2CallbackAgent.cpp
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -28,15 +28,11 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/FileManager.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace c2 {
+namespace org::apache::nifi::minifi::c2 {
 
 C2CallbackAgent::C2CallbackAgent(core::controller::ControllerServiceProvider* 
controller, state::Pausable* pause_handler, state::StateMonitor* updateSink,
                                  const std::shared_ptr<Configure> 
&configuration)
-    : C2Agent(controller, pause_handler, updateSink, configuration),
+    : C2Agent(controller, pause_handler, updateSink, configuration, 
std::make_shared<utils::file::FileSystem>(), []{}),
       stop(nullptr) {
 }
 
@@ -72,8 +68,4 @@ void C2CallbackAgent::handle_c2_server_response(const 
C2ContentResponse &resp) {
   }
 }
 
-} /* namespace c2 */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} /* namespace org::apache::nifi::minifi::c2 */

Reply via email to