This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 34839437140f92ab96149f531ea4d4dd9e63b2b6
Author: Adam Debreceni <[email protected]>
AuthorDate: Thu Jul 25 09:22:37 2024 +0000

    MINIFICPP-2314 - Send asset state hash in heartbeat, implement c2 asset sync
    
    Closes #1751
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 C2.md                                              |   9 +-
 CONFIGURE.md                                       |   9 +-
 conf/minifi.properties                             |   3 +-
 .../cluster/containers/MinifiContainer.py          |   4 +-
 encrypt-config/tests/resources/minifi.properties   |   2 +-
 ...th-additional-sensitive-props.minifi.properties |   2 +-
 libminifi/include/FlowController.h                 |   4 +-
 libminifi/include/c2/C2Agent.h                     |   8 +-
 libminifi/include/c2/C2Payload.h                   |  76 +++++-
 libminifi/include/c2/PayloadParser.h               |  27 +-
 libminifi/include/c2/PayloadSerializer.h           |  10 +-
 libminifi/include/c2/protocols/RESTProtocol.h      |   2 +-
 .../include/core/state/MetricsPublisherStore.h     |   3 +-
 .../include/core/state/nodes/AssetInformation.h    |  42 ++++
 .../include/core/state/nodes/ResponseNodeLoader.h  |   5 +-
 libminifi/include/properties/Configuration.h       |   1 +
 libminifi/include/utils/file/AssetManager.h        |  73 ++++++
 libminifi/include/utils/file/PathUtils.h           |  22 ++
 libminifi/src/Configuration.cpp                    |   1 +
 libminifi/src/FlowController.cpp                   |   5 +-
 libminifi/src/c2/C2Agent.cpp                       | 232 +++++++++++++----
 libminifi/src/c2/C2Payload.cpp                     |  14 +-
 libminifi/src/c2/HeartbeatJsonSerializer.cpp       |  15 +-
 libminifi/src/c2/protocols/RESTProtocol.cpp        |  26 +-
 libminifi/src/c2/protocols/RESTSender.cpp          |  13 +
 libminifi/src/c2/triggers/FileUpdateTrigger.cpp    |   4 +-
 libminifi/src/core/state/MetricsPublisherStore.cpp |   4 +-
 .../src/core/state/nodes/AssetInformation.cpp      |  47 ++++
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  14 +-
 .../src/core/state/nodes/SupportedOperations.cpp   |   4 +
 libminifi/src/utils/file/AssetManager.cpp          | 190 ++++++++++++++
 libminifi/test/integration/C2AssetSyncTest.cpp     | 280 +++++++++++++++++++++
 .../integration/C2ClearCoreComponentStateTest.cpp  |   2 +-
 .../test/integration/C2DescribeMetricsTest.cpp     |   4 +-
 libminifi/test/integration/C2MetricsTest.cpp       |  12 +-
 libminifi/test/integration/C2UpdateAssetTest.cpp   |  47 ++--
 .../test/libtest/integration/HTTPHandlers.cpp      |  58 +++--
 libminifi/test/libtest/integration/HTTPHandlers.h  |   8 +-
 .../test/libtest/integration/IntegrationBase.cpp   |   6 +-
 .../test/libtest/integration/IntegrationBase.h     |   2 +
 .../test/resources/encrypted.minifi.properties     |   2 +-
 libminifi/test/unit/PayloadParserTests.cpp         |  16 +-
 minifi_main/MiNiFiMain.cpp                         |   9 +-
 43 files changed, 1101 insertions(+), 216 deletions(-)

diff --git a/C2.md b/C2.md
index 9d84974ff..ad492ec57 100644
--- a/C2.md
+++ b/C2.md
@@ -66,9 +66,10 @@ be requested via C2 DESCRIBE manifest command.
     #   DeviceInfoNode: basic info about the system (OS, number of cores etc)
     #   AgentInformation: info about the MiNiFi agent, may include the manifest
     #   FlowInformation: information about the current flow, including queue 
sizes
+    #   AssetInformation: the state of the asset directory managed by the agent
     #   ConfigurationChecksums: hashes of the configuration files; can be used 
to detect unexpected modifications
     # the default is
-    nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+    
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation
 
     # control c2 heartbeat interval
     nifi.c2.agent.heartbeat.period=30 sec
@@ -80,8 +81,10 @@ be requested via C2 DESCRIBE manifest command.
     nifi.c2.rest.listener.cacert=<SSL Cert path>
 
     # specify the rest URIs if using RESTSender
-    
nifi.c2.rest.url=http://<your-c2-server>/<c2-api-path>/c2-protocol/heartbeat
-    
nifi.c2.rest.url.ack=http://<your-c2-server>/<c2-api-path>/c2-protocol/acknowledge
+    nifi.c2.rest.path.base=https://<your-c2-server>/<c2-api-path>
+    # specify either absolute url or relative to the nifi.c2.rest.path.base 
url for hearbeat and acknowledge
+    nifi.c2.rest.url=/c2-protocol/heartbeat
+    nifi.c2.rest.url.ack=/c2-protocol/acknowledge
     nifi.c2.flow.base.url=http://<your-c2-server>/<c2-api-path>/c2-protocol/
 
     # c2 agent identifier -- must be defined to run agent
diff --git a/CONFIGURE.md b/CONFIGURE.md
index 9f93c8a44..c645a5bc4 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -665,8 +665,13 @@ Additionally, a unique hexadecimal 
uid.minifi.device.segment should be assigned
 
 ### Asset directory
 
-It is possible to make agents download an asset (triggered through the c2 
protocol). The target directory can be specified
-using the `nifi.asset.directory` agent property, which defaults to 
`${MINIFI_HOME}/asset`.
+There is an asset directory specified using the `nifi.asset.directory` agent 
property, which defaults to `${MINIFI_HOME}/asset`.
+The files referenced in the `.state` file in this directory are managed by the 
agent. They are deleted, updated, downloaded
+using the asset sync c2 command. For the asset sync command to work, the c2 
server must be made aware of the current state of the
+managed assets by adding the `AssetInformation` entry to the 
`nifi.c2.root.classes` property.
+
+Files and directories not referenced in the `.state` file are not directly 
controlled by the agent, although
+it is possible to download an asset (triggered through the c2 protocol) into 
the asset directory instead.
 
 ### Controller Services
  If you need to reference a controller service in your config.yml file, use 
the following template. In the example, below, ControllerServiceClass is the 
name of the class defining the controller Service. ControllerService1
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 5e5c29545..425e6e64b 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -84,10 +84,11 @@ nifi.content.repository.class.name=DatabaseContentRepository
 ## base URL of the c2 server,
 ## very likely the same base url of rest urls
 #nifi.c2.flow.base.url=
+#nifi.c2.rest.path.base=
 #nifi.c2.rest.url=
 #nifi.c2.rest.url.ack=
 #nifi.c2.rest.ssl.context.service=
-nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation
 ## Minimize heartbeat payload size by excluding agent manifest from the 
heartbeat
 nifi.c2.full.heartbeat=false
 ## heartbeat twice a minute
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py 
b/docker/test/integration/cluster/containers/MinifiContainer.py
index bbe56a879..fee054f06 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -99,7 +99,7 @@ class MinifiContainer(FlowContainer):
                 
f.write(f"nifi.c2.rest.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/heartbeat\n";)
                 
f.write(f"nifi.c2.rest.url.ack=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/acknowledge\n";)
                 
f.write(f"nifi.c2.flow.base.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/\n";)
-                
f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n")
+                
f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation\n")
                 f.write("nifi.c2.full.heartbeat=false\n")
                 f.write("nifi.c2.agent.class=minifi-test-class\n")
                 f.write("nifi.c2.agent.identifier=minifi-test-id\n")
@@ -109,7 +109,7 @@ class MinifiContainer(FlowContainer):
                 
f.write(f"nifi.c2.rest.url.ack=https://minifi-c2-server-{self.feature_context.id}:10090/c2/config/acknowledge\n";)
                 f.write("nifi.c2.rest.ssl.context.service=SSLContextService\n")
                 
f.write(f"nifi.c2.flow.base.url=https://minifi-c2-server-{self.feature_context.id}:10090/c2/config/\n";)
-                
f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n")
+                
f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation\n")
                 f.write("nifi.c2.full.heartbeat=false\n")
                 f.write("nifi.c2.agent.class=minifi-test-class\n")
                 f.write("nifi.c2.agent.identifier=minifi-test-id\n")
diff --git a/encrypt-config/tests/resources/minifi.properties 
b/encrypt-config/tests/resources/minifi.properties
index 9bac06775..2f2db68bb 100644
--- a/encrypt-config/tests/resources/minifi.properties
+++ b/encrypt-config/tests/resources/minifi.properties
@@ -55,7 +55,7 @@ nifi.c2.enable=true
 nifi.c2.flow.base.url=http://localhost:10080/c2-server/api
 nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat
 
nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge
-nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation
 ## Minimize heartbeat payload size by excluding agent manifest from the 
heartbeat
 #nifi.c2.full.heartbeat=false
 ## heartbeat 4 times a second
diff --git 
a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties
 
b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties
index aff36a065..d2702c34d 100644
--- 
a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties
+++ 
b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties
@@ -57,7 +57,7 @@ nifi.c2.enable=true
 nifi.c2.flow.base.url=http://localhost:10080/c2-server/api
 nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat
 
nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge
-nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation
 ## Minimize heartbeat payload size by excluding agent manifest from the 
heartbeat
 #nifi.c2.full.heartbeat=false
 ## heartbeat 4 times a second
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 6f14f0446..c4a3402b9 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -55,6 +55,7 @@
 #include "TimerDrivenSchedulingAgent.h"
 #include "utils/Id.h"
 #include "utils/file/FileSystem.h"
+#include "utils/file/AssetManager.h"
 #include "core/state/nodes/ResponseNodeLoader.h"
 #include "core/state/MetricsPublisher.h"
 #include "core/state/MetricsPublisherStore.h"
@@ -72,7 +73,8 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
   FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo,
                  std::shared_ptr<Configure> configure, 
std::shared_ptr<core::FlowConfiguration> flow_configuration,
                  std::shared_ptr<core::ContentRepository> content_repo, 
std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store = nullptr,
-                 std::shared_ptr<utils::file::FileSystem> filesystem = 
std::make_shared<utils::file::FileSystem>(), std::function<void()> 
request_restart = []{});
+                 std::shared_ptr<utils::file::FileSystem> filesystem = 
std::make_shared<utils::file::FileSystem>(), std::function<void()> 
request_restart = []{},
+                 utils::file::AssetManager* asset_manager = {});
 
   ~FlowController() override;
 
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 63829fd2c..3f0e12a7c 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -43,6 +43,7 @@
 #include "utils/ThreadPool.h"
 #include "utils/file/FileSystem.h"
 #include "C2Utils.h"
+#include "utils/file/AssetManager.h"
 
 namespace org::apache::nifi::minifi::c2 {
 
@@ -62,7 +63,8 @@ class C2Agent : public state::UpdateController {
   C2Agent(std::shared_ptr<Configure> configuration,
           std::weak_ptr<state::response::NodeReporter> node_reporter,
           std::shared_ptr<utils::file::FileSystem> filesystem,
-          std::function<void()> request_restart);
+          std::function<void()> request_restart,
+          utils::file::AssetManager* asset_manager);
 
   void initialize(core::controller::ControllerServiceProvider *controller, 
state::Pausable *pause_handler, state::StateMonitor* update_sink);
   void start() override;
@@ -131,6 +133,8 @@ class C2Agent : public state::UpdateController {
    */
   void handle_describe(const C2ContentResponse &resp);
 
+  void handle_sync(const C2ContentResponse &resp);
+
 
   enum class UpdateResult {
     NO_UPDATE,
@@ -235,6 +239,8 @@ class C2Agent : public state::UpdateController {
 
   // time point the last time we performed a heartbeat.
   std::chrono::steady_clock::time_point last_run_;
+
+  utils::file::AssetManager* asset_manager_;
 };
 
 }  // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h
index 9ff1aaaa7..09c90540f 100644
--- a/libminifi/include/c2/C2Payload.h
+++ b/libminifi/include/c2/C2Payload.h
@@ -29,6 +29,7 @@
 #include "utils/Enum.h"
 #include "utils/gsl.h"
 #include "utils/span.h"
+#include "rapidjson/document.h"
 
 namespace org::apache::nifi::minifi::c2 {
 
@@ -43,7 +44,8 @@ enum class Operation : uint8_t {
   clear,
   transfer,
   pause,
-  resume
+  resume,
+  sync
 };
 
 enum class DescribeOperand : uint8_t {
@@ -70,6 +72,10 @@ enum class ClearOperand : uint8_t{
   corecomponentstate
 };
 
+enum class SyncOperand : uint8_t{
+  resource
+};
+
 #define PAYLOAD_NO_STATUS 0
 #define PAYLOAD_SUCCESS 1
 #define PAYLOAD_FAILURE 2
@@ -79,21 +85,65 @@ enum Direction {
   RECEIVE
 };
 
-struct AnnotatedValue : state::response::ValueNode {
-  using state::response::ValueNode::ValueNode;
-  using state::response::ValueNode::operator=;
+class C2Value {
+ public:
+  friend std::ostream& operator<<(std::ostream& out, const C2Value& val);
+
+  C2Value() = default;
+  C2Value(const C2Value& other) {
+    (*this) = other;
+  }
+  C2Value(C2Value&&) = default;
+  template<typename T>
+  requires(std::constructible_from<state::response::ValueNode, T>)
+  explicit C2Value(T&& value) { value_ = 
state::response::ValueNode{std::forward<T>(value)}; }
+  explicit C2Value(const rapidjson::Value& json_value) {
+    value_.emplace<rapidjson::Document>();
+    get<rapidjson::Document>(value_).CopyFrom(json_value, 
get<rapidjson::Document>(value_).GetAllocator());
+  }
+  explicit C2Value(rapidjson::Document&& json_doc) {
+    value_ = std::move(json_doc);
+  }
 
-  [[nodiscard]] std::optional<std::reference_wrapper<const AnnotatedValue>> 
getAnnotation(const std::string& name) const {
-    auto it = annotations.find(name);
-    if (it == annotations.end()) {
-      return {};
+  C2Value& operator=(const C2Value& other) {
+    if (auto* other_val_node = 
get_if<state::response::ValueNode>(&other.value_)) {
+      value_ = *other_val_node;
+    } else {
+      value_.emplace<rapidjson::Document>();
+      
get<rapidjson::Document>(value_).CopyFrom(get<rapidjson::Document>(other.value_),
 get<rapidjson::Document>(value_).GetAllocator());
     }
-    return std::cref(it->second);
+    return *this;
+  }
+
+  C2Value& operator=(C2Value&&) = default;
+
+
+  bool empty() const {
+    if (auto* val_node = get_if<state::response::ValueNode>(&value_)) {
+      return val_node->empty();
+    }
+    return false;
+  }
+
+  std::string to_string() const {
+    if (auto* val_node = get_if<state::response::ValueNode>(&value_)) {
+      return val_node->to_string();
+    }
+    return std::string{get<rapidjson::Document>(value_).GetString(), 
get<rapidjson::Document>(value_).GetStringLength()};
+  }
+
+  const rapidjson::Document* json() const {
+    return get_if<rapidjson::Document>(&value_);
+  }
+
+  const state::response::ValueNode* valueNode() const {
+    return get_if<state::response::ValueNode>(&value_);
   }
 
-  friend std::ostream& operator<<(std::ostream& out, const AnnotatedValue& 
val);
+  bool operator==(const C2Value&) const = default;
 
-  std::map<std::string, AnnotatedValue> annotations;
+ private:
+  std::variant<state::response::ValueNode, rapidjson::Document> value_;
 };
 
 struct C2ContentResponse {
@@ -115,7 +165,7 @@ struct C2ContentResponse {
 
   friend std::ostream& operator<<(std::ostream& out, const C2ContentResponse& 
response);
 
-  std::optional<std::string> getArgument(const std::string& arg_name) const {
+  std::optional<std::string> getStringArgument(const std::string& arg_name) 
const {
     if (auto it = operation_arguments.find(arg_name); it != 
operation_arguments.end()) {
       return it->second.to_string();
     }
@@ -134,7 +184,7 @@ struct C2ContentResponse {
   // name applied to commands
   std::string name;
   // commands that correspond with the operation.
-  std::map<std::string, AnnotatedValue> operation_arguments;
+  std::map<std::string, C2Value> operation_arguments;
 };
 
 /**
diff --git a/libminifi/include/c2/PayloadParser.h 
b/libminifi/include/c2/PayloadParser.h
index d6a97642b..01b70a58e 100644
--- a/libminifi/include/c2/PayloadParser.h
+++ b/libminifi/include/c2/PayloadParser.h
@@ -138,27 +138,20 @@ class PayloadParser {
   }
 
   template<typename T>
-  inline T getAs(const std::string &field) {
+  inline T getAs(const std::string &field, const std::optional<T>& fallback = 
std::nullopt) {
     for (const auto &cmd : ref_.getContent()) {
-      auto exists = cmd.operation_arguments.find(field);
-      if (exists != cmd.operation_arguments.end()) {
-        return convert_if<T>(exists->second.getValue())();
+      if (auto it = cmd.operation_arguments.find(field); it != 
cmd.operation_arguments.end()) {
+        if (auto* val_node = it->second.valueNode()) {
+          return convert_if<T>(val_node->getValue())();
+        }
       }
     }
-    std::stringstream ss;
-    ss << "Invalid Field. Could not find " << field << " in " << 
ref_.getLabel();
-    throw PayloadParseException(ss.str());
-  }
-
-  template<typename T>
-  inline T getAs(const std::string &field, const T &fallback) {
-    for (const auto &cmd : ref_.getContent()) {
-      auto exists = cmd.operation_arguments.find(field);
-      if (exists != cmd.operation_arguments.end()) {
-        return convert_if<T>(exists->second.getValue())();
-      }
+    if (!fallback) {
+      std::stringstream ss;
+      ss << "Invalid Field. Could not find " << field << " in " << 
ref_.getLabel();
+      throw PayloadParseException(ss.str());
     }
-    return fallback;
+    return fallback.value();
   }
 
   size_t getSize() const {
diff --git a/libminifi/include/c2/PayloadSerializer.h 
b/libminifi/include/c2/PayloadSerializer.h
index 17de97792..e9ca060b9 100644
--- a/libminifi/include/c2/PayloadSerializer.h
+++ b/libminifi/include/c2/PayloadSerializer.h
@@ -42,7 +42,7 @@ class PayloadSerializer {
   /**
    * Static function that serializes the value nodes
    */
-  static void serializeValueNode(state::response::ValueNode &value, 
std::shared_ptr<io::OutputStream> stream) {
+  static void serializeValueNode(const state::response::ValueNode &value, 
std::shared_ptr<io::OutputStream> stream) {
     auto base_type = value.getValue();
     if (!base_type) {
       uint8_t type = 0;
@@ -95,7 +95,7 @@ class PayloadSerializer {
         stream->write(size);
         for (auto content : payload_content.operation_arguments) {
           stream->write(content.first);
-          serializeValueNode(content.second, stream);
+          serializeValueNode(*gsl::not_null(content.second.valueNode()), 
stream);
         }
       }
       if (nested_payload.getNestedPayloads().size() > 0) {
@@ -170,7 +170,7 @@ class PayloadSerializer {
       stream->write(size);
       for (auto content : payload_content.operation_arguments) {
         stream->write(content.first);
-        serializeValueNode(content.second, stream);
+        serializeValueNode(*gsl::not_null(content.second.valueNode()), stream);
       }
     }
     serialize(op, payload, stream);
@@ -251,7 +251,7 @@ class PayloadSerializer {
         for (uint32_t j = 0; j < args; j++) {
           std::string first, second;
           stream->read(first);
-          content.operation_arguments[first] = deserializeValueNode(stream);
+          content.operation_arguments[first] = 
C2Value{deserializeValueNode(stream)};
         }
         subPayload.addContent(std::move(content));
       }
@@ -293,7 +293,7 @@ class PayloadSerializer {
         std::string first, second;
         stream.read(first);
         // stream.readUTF(second);
-        content.operation_arguments[first] = deserializeValueNode(&stream);
+        content.operation_arguments[first] = 
C2Value{deserializeValueNode(&stream)};
       }
       newPayload.addContent(std::move(content));
     }
diff --git a/libminifi/include/c2/protocols/RESTProtocol.h 
b/libminifi/include/c2/protocols/RESTProtocol.h
index 36c741278..cb04a2de1 100644
--- a/libminifi/include/c2/protocols/RESTProtocol.h
+++ b/libminifi/include/c2/protocols/RESTProtocol.h
@@ -43,7 +43,7 @@ class RESTProtocol : public HeartbeatJsonSerializer {
  protected:
   void initialize(core::controller::ControllerServiceProvider* controller, 
const std::shared_ptr<Configure> &configure);
   void serializeNestedPayload(rapidjson::Value& target, const C2Payload& 
payload, rapidjson::Document::AllocatorType& alloc) override;
-  static C2Payload parseJsonResponse(const C2Payload &payload, std::span<const 
std::byte> response);
+  C2Payload parseJsonResponse(const C2Payload &payload, std::span<const 
std::byte> response) const;
 
  private:
   bool containsPayload(const C2Payload &o);
diff --git a/libminifi/include/core/state/MetricsPublisherStore.h 
b/libminifi/include/core/state/MetricsPublisherStore.h
index 544e82445..a1deb3cec 100644
--- a/libminifi/include/core/state/MetricsPublisherStore.h
+++ b/libminifi/include/core/state/MetricsPublisherStore.h
@@ -27,13 +27,14 @@
 #include "core/state/nodes/ResponseNodeLoader.h"
 #include "utils/gsl.h"
 #include "core/ProcessGroup.h"
+#include "utils/file/AssetManager.h"
 
 namespace org::apache::nifi::minifi::state {
 
 class MetricsPublisherStore {
  public:
   MetricsPublisherStore(std::shared_ptr<Configure> configuration, const 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& 
repository_metric_sources,
-    std::shared_ptr<core::FlowConfiguration> flow_configuration);
+    std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager = nullptr);
   void initialize(core::controller::ControllerServiceProvider* controller, 
state::StateMonitor* update_sink);
   void loadMetricNodes(core::ProcessGroup* root);
   void clearMetricNodes();
diff --git a/libminifi/include/core/state/nodes/AssetInformation.h 
b/libminifi/include/core/state/nodes/AssetInformation.h
new file mode 100644
index 000000000..feffa38a0
--- /dev/null
+++ b/libminifi/include/core/state/nodes/AssetInformation.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/file/AssetManager.h"
+#include "core/logging/Logger.h"
+
+namespace org::apache::nifi::minifi::state::response {
+
+class AssetInformation : public ResponseNode {
+ public:
+  AssetInformation();
+  explicit AssetInformation(std::string_view name, const utils::Identifier& 
uuid = {}) : ResponseNode(name, uuid) {}
+
+  MINIFIAPI static constexpr const char* Description = "Metric node that 
defines hash for all asset identifiers";
+
+  void setAssetManager(utils::file::AssetManager* asset_manager);
+
+  std::string getName() const override { return "resourceInfo"; }
+  std::vector<SerializedResponseNode> serialize() override;
+
+ private:
+  utils::file::AssetManager* asset_manager_{nullptr};
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h 
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index af330585c..9eb55f413 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -34,13 +34,14 @@
 #include "utils/Id.h"
 #include "utils/expected.h"
 #include "core/RepositoryMetricsSource.h"
+#include "utils/file/AssetManager.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
 class ResponseNodeLoader {
  public:
   ResponseNodeLoader(std::shared_ptr<Configure> configuration, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources,
-    std::shared_ptr<core::FlowConfiguration> flow_configuration);
+    std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager = nullptr);
 
   void setNewConfigRoot(core::ProcessGroup* root);
   void clearConfigRoot();
@@ -62,6 +63,7 @@ class ResponseNodeLoader {
   void initializeAgentStatus(const SharedResponseNode& response_node) const;
   void initializeConfigurationChecksums(const SharedResponseNode& 
response_node) const;
   void initializeFlowMonitor(const SharedResponseNode& response_node) const;
+  void initializeAssetInformation(const SharedResponseNode& response_node) 
const;
   std::vector<SharedResponseNode> getMatchingComponentMetricsNodes(const 
std::string& regex_str) const;
 
   mutable std::mutex root_mutex_;
@@ -73,6 +75,7 @@ class ResponseNodeLoader {
   std::shared_ptr<Configure> configuration_;
   std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources_;
   std::shared_ptr<core::FlowConfiguration> flow_configuration_;
+  utils::file::AssetManager* asset_manager_{};
   core::controller::ControllerServiceProvider* controller_{};
   state::StateMonitor* update_sink_{};
   std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<ResponseNodeLoader>::getLogger()};
diff --git a/libminifi/include/properties/Configuration.h 
b/libminifi/include/properties/Configuration.h
index 2d9300636..cea438ea9 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -118,6 +118,7 @@ class Configuration : public Properties {
   static constexpr const char *nifi_c2_root_class_definitions = 
"nifi.c2.root.class.definitions";
   static constexpr const char *nifi_c2_rest_listener_port = 
"nifi.c2.rest.listener.port";
   static constexpr const char *nifi_c2_rest_listener_cacert = 
"nifi.c2.rest.listener.cacert";
+  static constexpr const char *nifi_c2_rest_path_base = 
"nifi.c2.rest.path.base";
   static constexpr const char *nifi_c2_rest_url = "nifi.c2.rest.url";
   static constexpr const char *nifi_c2_rest_url_ack = "nifi.c2.rest.url.ack";
   static constexpr const char *nifi_c2_rest_ssl_context_service = 
"nifi.c2.rest.ssl.context.service";
diff --git a/libminifi/include/utils/file/AssetManager.h 
b/libminifi/include/utils/file/AssetManager.h
new file mode 100644
index 000000000..93424a145
--- /dev/null
+++ b/libminifi/include/utils/file/AssetManager.h
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <filesystem>
+#include <functional>
+#include <vector>
+#include <string>
+#include <memory>
+#include <set>
+#include "core/logging/Logger.h"
+#include "utils/expected.h"
+#include "properties/Configure.h"
+
+namespace org::apache::nifi::minifi::utils::file {
+
+struct AssetDescription {
+  std::string id;
+  std::filesystem::path path;
+  std::string url;
+
+  bool operator<(const AssetDescription& other) const {
+    return id < other.id;
+  }
+};
+
+struct AssetLayout {
+  std::string digest;
+  std::set<AssetDescription> assets;
+
+  void clear() {
+    digest.clear();
+    assets.clear();
+  }
+};
+
+class AssetManager {
+ public:
+  explicit AssetManager(const Configure& configuration);
+
+  nonstd::expected<void, std::string> sync(const AssetLayout& layout, const 
std::function<nonstd::expected<std::vector<std::byte>, 
std::string>(std::string_view /*url*/)>& fetch);
+
+  std::string hash() const;
+
+  std::filesystem::path getRoot() const;
+
+ private:
+  void refreshState();
+
+  void persist() const;
+
+  mutable std::recursive_mutex mtx_;
+  std::filesystem::path root_;
+  AssetLayout state_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::utils::file
diff --git a/libminifi/include/utils/file/PathUtils.h 
b/libminifi/include/utils/file/PathUtils.h
index 1df2e9d89..04886c1a6 100644
--- a/libminifi/include/utils/file/PathUtils.h
+++ b/libminifi/include/utils/file/PathUtils.h
@@ -25,6 +25,7 @@
 #include <string>
 #include <system_error>
 #include <utility>
+#include "utils/expected.h"
 
 namespace org::apache::nifi::minifi::utils::file {
 
@@ -42,6 +43,27 @@ inline std::optional<std::filesystem::path> 
canonicalize(const std::filesystem::
   return result;
 }
 
+inline nonstd::expected<void, std::string> validateRelativeFilePath(const 
std::filesystem::path& path) {
+  if (path.empty()) {
+    return nonstd::make_unexpected("Empty file path");
+  }
+  if (!path.is_relative()) {
+    return nonstd::make_unexpected("File path must be a relative path '" + 
path.string() + "'");
+  }
+  if (!path.has_filename()) {
+    return nonstd::make_unexpected("Filename missing in output path '" + 
path.string() + "'");
+  }
+  if (path.filename() == "." || path.filename() == "..") {
+    return nonstd::make_unexpected("Invalid filename '" + 
path.filename().string() + "'");
+  }
+  for (const auto& segment : path) {
+    if (segment == "..") {
+      return nonstd::make_unexpected("Accessing parent directory is forbidden 
in file path '" + path.string() + "'");
+    }
+  }
+  return {};
+}
+
 
 /**
  * Represents filesystem space information in bytes
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 024393e57..116746c9c 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -88,6 +88,7 @@ const std::unordered_map<std::string_view, 
gsl::not_null<const core::PropertyVal
   {Configuration::nifi_c2_root_class_definitions, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
   {Configuration::nifi_c2_rest_listener_port, 
gsl::make_not_null(&core::StandardPropertyTypes::LISTEN_PORT_TYPE)},
   {Configuration::nifi_c2_rest_listener_cacert, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
+  {Configuration::nifi_c2_rest_path_base, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
   {Configuration::nifi_c2_rest_url, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
   {Configuration::nifi_c2_rest_url_ack, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
   {Configuration::nifi_c2_rest_ssl_context_service, 
gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)},
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 8e5739d9d..39c24e320 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -50,7 +50,8 @@ namespace org::apache::nifi::minifi {
 FlowController::FlowController(std::shared_ptr<core::Repository> 
provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
                                std::shared_ptr<Configure> configure, 
std::shared_ptr<core::FlowConfiguration> flow_configuration,
                                std::shared_ptr<core::ContentRepository> 
content_repo, std::unique_ptr<state::MetricsPublisherStore> 
metrics_publisher_store,
-                               std::shared_ptr<utils::file::FileSystem> 
filesystem, std::function<void()> request_restart)
+                               std::shared_ptr<utils::file::FileSystem> 
filesystem, std::function<void()> request_restart,
+                               utils::file::AssetManager* asset_manager)
     : 
core::controller::ForwardingControllerServiceProvider(core::className<FlowController>()),
       running_(false),
       initialized_(false),
@@ -82,7 +83,7 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
     if (auto publisher = 
metrics_publisher_store_->getMetricsPublisher(c2::C2_METRICS_PUBLISHER).lock()) 
{
       c2_metrics_publisher = 
std::dynamic_pointer_cast<c2::C2MetricsPublisher>(publisher);
     }
-    c2_agent_ = std::make_unique<c2::C2Agent>(configuration_, 
c2_metrics_publisher, std::move(filesystem), std::move(request_restart));
+    c2_agent_ = std::make_unique<c2::C2Agent>(configuration_, 
c2_metrics_publisher, std::move(filesystem), std::move(request_restart), 
asset_manager);
   }
 
   if (c2::isControllerSocketEnabled(configuration_)) {
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 5f1097fb4..009c4293a 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -38,6 +38,7 @@
 #include "utils/file/FileManager.h"
 #include "utils/file/FileSystem.h"
 #include "http/BaseHTTPClient.h"
+#include "utils/file/PathUtils.h"
 #include "utils/Environment.h"
 #include "utils/Monitors.h"
 #include "utils/StringUtils.h"
@@ -46,6 +47,7 @@
 #include "utils/Id.h"
 #include "c2/C2Utils.h"
 #include "c2/protocols/RESTSender.h"
+#include "rapidjson/error/en.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -54,7 +56,8 @@ namespace org::apache::nifi::minifi::c2 {
 C2Agent::C2Agent(std::shared_ptr<Configure> configuration,
                  std::weak_ptr<state::response::NodeReporter> node_reporter,
                  std::shared_ptr<utils::file::FileSystem> filesystem,
-                 std::function<void()> request_restart)
+                 std::function<void()> request_restart,
+                 utils::file::AssetManager* asset_manager)
     : heart_beat_period_(3s),
       max_c2_responses(5),
       configuration_(std::move(configuration)),
@@ -62,7 +65,8 @@ C2Agent::C2Agent(std::shared_ptr<Configure> configuration,
       filesystem_(std::move(filesystem)),
       thread_pool_(2, nullptr, "C2 threadpool"),
       request_restart_(std::move(request_restart)),
-      last_run_(std::chrono::steady_clock::now()) {
+      last_run_(std::chrono::steady_clock::now()),
+      asset_manager_(asset_manager) {
   if (!configuration_->getAgentClass()) {
     logger_->log_info("Agent class is not predefined");
   }
@@ -251,7 +255,7 @@ void C2Agent::serializeMetrics(C2Payload &metric_payload, 
const std::string &nam
     } else {
       C2ContentResponse response(metric_payload.getOperation());
       response.name = name;
-      response.operation_arguments[metric.name] = metric.value;
+      response.operation_arguments[metric.name] = C2Value{metric.value};
       metric_payload.addContent(std::move(response), is_collapsible);
     }
   }
@@ -381,6 +385,9 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse &resp) {
       }
       break;
     }
+    case Operation::sync:
+      handle_sync(resp);
+      break;
     default:
       break;
       // do nothing
@@ -401,7 +408,7 @@ C2Payload C2Agent::prepareConfigurationOptions(const 
C2ContentResponse &resp) co
       if (configuration_->get(key, value)) {
         C2ContentResponse option(Operation::acknowledge);
         option.name = key;
-        option.operation_arguments[key] = value;
+        option.operation_arguments[key] = C2Value{value};
         options.addContent(std::move(option));
       }
     }
@@ -530,7 +537,7 @@ void C2Agent::handle_describe(const C2ContentResponse 
&resp) {
           for (const auto &line : trace.getTraces()) {
             C2ContentResponse option(Operation::acknowledge);
             option.name = line;
-            option.operation_arguments[line] = line;
+            option.operation_arguments[line] = C2Value{line};
             options.addContent(std::move(option));
           }
           response.addPayload(std::move(options));
@@ -553,7 +560,7 @@ void C2Agent::handle_describe(const C2ContentResponse 
&resp) {
           for (const auto& kv : core_component_state.second) {
             C2ContentResponse entry(Operation::acknowledge);
             entry.name = kv.first;
-            entry.operation_arguments[kv.first] = kv.second;
+            entry.operation_arguments[kv.first] = C2Value{kv.second};
             state.addContent(std::move(entry));
           }
           states.addPayload(std::move(state));
@@ -608,12 +615,18 @@ void C2Agent::handlePropertyUpdate(const 
C2ContentResponse &resp) {
   };
 
   for (const auto& [name, value] : resp.operation_arguments) {
-    bool persist = (
-        value.getAnnotation("persist")
-        | utils::transform(&AnnotatedValue::to_string)
-        | utils::andThen(utils::string::toBool)).value_or(true);
-    PropertyChangeLifetime lifetime = persist ? 
PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT;
-    changeUpdateState(update_property(name, value.to_string(), lifetime));
+    if (auto* json_val = value.json()) {
+      if (json_val->IsObject() && json_val->HasMember("value")) {
+        PropertyChangeLifetime lifetime = PropertyChangeLifetime::PERSISTENT;
+        if (json_val->HasMember("persist")) {
+          lifetime = (*json_val)["persist"].GetBool() ? 
PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT;
+        }
+        std::string property_value{(*json_val)["value"].GetString(), 
(*json_val)["value"].GetStringLength()};
+        changeUpdateState(update_property(name, property_value, lifetime));
+        continue;
+      }
+    }
+    changeUpdateState(update_property(name, value.to_string(), 
PropertyChangeLifetime::PERSISTENT));
   }
   // apply changes and persist properties requested to be persisted
   const bool propertyWasUpdated = result == state::UpdateState::FULLY_APPLIED 
|| result == state::UpdateState::PARTIALLY_APPLIED;
@@ -700,6 +713,160 @@ void C2Agent::handle_transfer(const C2ContentResponse 
&resp) {
   }
 }
 
+void C2Agent::handle_sync(const 
org::apache::nifi::minifi::c2::C2ContentResponse &resp) {
+  logger_->log_info("Requested resource synchronization");
+  auto send_error = [&] (std::string_view error) {
+    logger_->log_error("{}", error);
+    C2Payload response(Operation::acknowledge, state::UpdateState::SET_ERROR, 
resp.ident, true);
+    response.setRawData(as_bytes(std::span(error.begin(), error.end())));
+    enqueue_c2_response(std::move(response));
+  };
+
+  if (!asset_manager_) {
+    send_error("Internal error: no asset manager");
+    return;
+  }
+
+  SyncOperand operand = SyncOperand::resource;
+  try {
+    operand = utils::enumCast<SyncOperand>(resp.name, true);
+  } catch(const std::runtime_error&) {
+    send_error("Unknown operand '" + resp.name + "'");
+    return;
+  }
+
+  gsl_Assert(operand == SyncOperand::resource);
+
+  utils::file::AssetLayout asset_layout;
+
+  auto state_it = resp.operation_arguments.find("globalHash");
+  if (state_it == resp.operation_arguments.end()) {
+    send_error("Malformed request, missing 'globalHash' argument");
+    return;
+  }
+
+  const rapidjson::Document* state_doc = state_it->second.json();
+  if (!state_doc) {
+    send_error("Argument 'globalHash' is malformed");
+    return;
+  }
+
+  if (!state_doc->IsObject()) {
+    send_error("Malformed request, 'globalHash' is not an object");
+    return;
+  }
+
+  if (!state_doc->HasMember("digest")) {
+    send_error("Malformed request, 'globalHash' has no member 'digest'");
+    return;
+  }
+  if (!(*state_doc)["digest"].IsString()) {
+    send_error("Malformed request, 'globalHash.digest' is not a string");
+    return;
+  }
+
+  asset_layout.digest = std::string{(*state_doc)["digest"].GetString(), 
(*state_doc)["digest"].GetStringLength()};
+
+  auto resource_list_it = resp.operation_arguments.find("resourceList");
+  if (resource_list_it == resp.operation_arguments.end()) {
+    send_error("Malformed request, missing 'resourceList' argument");
+    return;
+  }
+
+  const rapidjson::Document* resource_list = resource_list_it->second.json();
+  if (!resource_list) {
+    send_error("Argument 'resourceList' is malformed");
+    return;
+  }
+  if (!resource_list->IsArray()) {
+    send_error("Malformed request, 'resourceList' is not an array");
+    return;
+  }
+
+  for (rapidjson::SizeType resource_idx = 0; resource_idx < 
resource_list->Size(); ++resource_idx) {
+    auto& resource = resource_list->GetArray()[resource_idx];
+    if (!resource.IsObject()) {
+      send_error(fmt::format("Malformed request, 'resourceList[{}]' is not an 
object", resource_idx));
+      return;
+    }
+    auto get_member_str = [&] (const char* key) -> 
nonstd::expected<std::string_view, std::string> {
+      if (!resource.HasMember(key)) {
+        return nonstd::make_unexpected(fmt::format("Malformed request, 
'resourceList[{}]' has no member '{}'", resource_idx, key));
+      }
+      if (!resource[key].IsString()) {
+        return nonstd::make_unexpected(fmt::format("Malformed request, 
'resourceList[{}].{}' is not a string", resource_idx, key));
+      }
+      return std::string_view{resource[key].GetString(), 
resource[key].GetStringLength()};
+    };
+    auto id = get_member_str("resourceId");
+    if (!id) {
+      send_error(id.error());
+      return;
+    }
+    auto name = get_member_str("resourceName");
+    if (!name) {
+      send_error(name.error());
+      return;
+    }
+    auto type = get_member_str("resourceType");
+    if (!type) {
+      send_error(type.error());
+      return;
+    }
+    if (type.value() != "ASSET") {
+      logger_->log_info("Resource (id = '{}', name = '{}') with type '{}' is 
not yet supported", id.value(), name.value(), type.value());
+      continue;
+    }
+    auto path = get_member_str("resourcePath");
+    if (!path) {
+      send_error(path.error());
+      return;
+    }
+    auto url = get_member_str("url");
+    if (!url) {
+      send_error(url.error());
+      return;
+    }
+
+    auto full_path = std::filesystem::path{path.value()} / name.value();  // 
NOLINT(whitespace/braces)
+
+    auto path_valid = utils::file::validateRelativeFilePath(full_path);
+    if (!path_valid) {
+      send_error(path_valid.error());
+      return;
+    }
+
+    asset_layout.assets.insert(utils::file::AssetDescription{
+        .id = std::string{id.value()},
+        .path = full_path,
+        .url = std::string{url.value()}
+    });
+  }
+
+  auto fetch = [&] (std::string_view url) -> 
nonstd::expected<std::vector<std::byte>, std::string> {
+    auto resolved_url = resolveUrl(std::string{url});
+    if (!resolved_url) {
+      return nonstd::make_unexpected("Couldn't resolve url");
+    }
+    C2Payload file_response = protocol_->fetch(resolved_url.value());
+
+    if (file_response.getStatus().getState() != 
state::UpdateState::READ_COMPLETE) {
+      return nonstd::make_unexpected("Failed to fetch file from " + 
resolved_url.value());
+    }
+
+    return std::move(file_response).moveRawData();
+  };
+
+  auto result = asset_manager_->sync(asset_layout, fetch);
+  if (!result) {
+    send_error(result.error());
+    return;
+  }
+
+  C2Payload response(Operation::acknowledge, 
state::UpdateState::FULLY_APPLIED, resp.ident, true);
+  enqueue_c2_response(std::move(response));
+}
+
 utils::TaskRescheduleInfo C2Agent::produce() {
   // place priority on messages to send to the c2 server
   if (protocol_ != nullptr) {
@@ -789,6 +956,9 @@ std::optional<std::string> C2Agent::resolveUrl(const 
std::string& url) const {
     return url;
   }
   std::string base;
+  if (configuration_->get(Configuration::nifi_c2_rest_path_base, base)) {
+    return base + url;
+  }
   if (!configuration_->get(Configuration::nifi_c2_rest_url, "c2.rest.url", 
base)) {
     logger_->log_error("Missing C2 REST URL");
     return std::nullopt;
@@ -891,27 +1061,6 @@ static auto make_path(const std::string& str) {
   return std::filesystem::path(str);
 }
 
-static std::optional<std::string> validateFilePath(const 
std::filesystem::path& path) {
-  if (path.empty()) {
-    return "Empty file path";
-  }
-  if (!path.is_relative()) {
-    return "File path must be a relative path '" + path.string() + "'";
-  }
-  if (!path.has_filename()) {
-    return "Filename missing in output path '" + path.string() + "'";
-  }
-  if (path.filename() == "." || path.filename() == "..") {
-    return "Invalid filename '" + path.filename().string() + "'";
-  }
-  for (const auto& segment : path) {
-    if (segment == "..") {
-      return "Accessing parent directory is forbidden in file path '" + 
path.string() + "'";
-    }
-  }
-  return std::nullopt;
-}
-
 void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) {
   auto send_error = [&] (std::string_view error) {
     logger_->log_error("{}", error);
@@ -919,19 +1068,16 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& 
resp) {
     response.setRawData(as_bytes(std::span(error.begin(), error.end())));
     enqueue_c2_response(std::move(response));
   };
-  std::filesystem::path asset_dir = configuration_->getHome() / "asset";
-  if (auto asset_dir_str = 
configuration_->get(Configuration::nifi_asset_directory)) {
-    asset_dir = asset_dir_str.value();
-  }
 
   // output file
   std::filesystem::path file_path;
-  if (auto file_rel = resp.getArgument("file") | utils::transform(make_path)) {
-    if (auto error = validateFilePath(file_rel.value())) {
-      send_error(error.value());
+  if (auto file_rel = resp.getStringArgument("file") | 
utils::transform(make_path)) {
+    auto result = utils::file::validateRelativeFilePath(file_rel.value());
+    if (!result) {
+      send_error(result.error());
       return;
     }
-    file_path = asset_dir / file_rel.value();
+    file_path = asset_manager_->getRoot() / file_rel.value();
   } else {
     send_error("Couldn't find 'file' argument");
     return;
@@ -939,7 +1085,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& 
resp) {
 
   // source url
   std::string url;
-  if (auto url_str = resp.getArgument("url")) {
+  if (auto url_str = resp.getStringArgument("url")) {
     if (auto resolved_url = resolveUrl(*url_str)) {
       url = resolved_url.value();
     } else {
@@ -953,7 +1099,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& 
resp) {
 
   // forceDownload
   bool force_download = false;
-  if (auto force_download_str = resp.getArgument("forceDownload")) {
+  if (auto force_download_str = resp.getStringArgument("forceDownload")) {
     if (utils::string::equalsIgnoreCase(force_download_str.value(), "true")) {
       force_download = true;
     } else if (utils::string::equalsIgnoreCase(force_download_str.value(), 
"false")) {
diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp
index e70413cd5..f21f60437 100644
--- a/libminifi/src/c2/C2Payload.cpp
+++ b/libminifi/src/c2/C2Payload.cpp
@@ -131,14 +131,14 @@ std::ostream& operator<<(std::ostream& out, const 
C2ContentResponse& response) {
     << "}";
 }
 
-std::ostream& operator<<(std::ostream& out, const AnnotatedValue& val) {
-  if (val.value_) {
-    out << '"' << val.value_->c_str() << '"';
+std::ostream& operator<<(std::ostream& out, const C2Value& val) {
+  if (auto* val_ptr = val.valueNode()) {
+    out << '"' << val_ptr->to_string() << '"';
   } else {
-    out << "<null>";
-  }
-  if (!val.annotations.empty()) {
-    out << val.annotations;
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    gsl::not_null(val.json())->Accept(writer);
+    out << std::string_view{buffer.GetString(), buffer.GetLength()};
   }
   return out;
 }
diff --git a/libminifi/src/c2/HeartbeatJsonSerializer.cpp 
b/libminifi/src/c2/HeartbeatJsonSerializer.cpp
index d15ecf71a..a25b47f77 100644
--- a/libminifi/src/c2/HeartbeatJsonSerializer.cpp
+++ b/libminifi/src/c2/HeartbeatJsonSerializer.cpp
@@ -60,9 +60,16 @@ static void serializeOperationInfo(rapidjson::Value& target, 
const C2Payload& pa
   target.AddMember("identifier", rapidjson::Value(id.c_str(), alloc), alloc);
 }
 
-static void setJsonStr(const std::string& key, const 
state::response::ValueNode& value, rapidjson::Value& parent, 
rapidjson::Document::AllocatorType& alloc) {  // NOLINT
+static void setJsonStr(const std::string& key, const c2::C2Value& value, 
rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) {
   rapidjson::Value valueVal;
-  auto base_type = value.getValue();
+
+  if (auto* json_val = value.json()) {
+    valueVal.CopyFrom(*json_val, alloc);
+    parent.AddMember(rapidjson::Value(key.c_str(), alloc), valueVal, alloc);
+    return;
+  }
+
+  auto base_type = gsl::not_null(value.valueNode())->getValue();
 
   auto type_index = base_type->getTypeIndex();
   if (auto sub_type = 
std::dynamic_pointer_cast<core::TransformableValue>(base_type)) {
@@ -156,9 +163,9 @@ static rapidjson::Value serializeConnectionQueues(const 
C2Payload& payload, std:
   updatedContent.name = uuid;
   adjusted.setLabel(uuid);
   adjusted.setIdentifier(uuid);
-  c2::AnnotatedValue nd;
+  c2::C2Value nd;
   // name should be what was previously the TLN ( top level node )
-  nd = name;
+  nd = C2Value{name};
   updatedContent.operation_arguments.insert(std::make_pair("name", nd));
   // the rvalue reference is an unfortunate side effect of the underlying API 
decision.
   adjusted.addContent(std::move(updatedContent), true);
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp 
b/libminifi/src/c2/protocols/RESTProtocol.cpp
index ca4f3faea..fee4b6327 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -28,6 +28,7 @@
 #include <string>
 #include <utility>
 
+#include "rapidjson/error/en.h"
 #include "core/TypedValues.h"
 #include "utils/gsl.h"
 #include "properties/Configuration.h"
@@ -36,26 +37,7 @@
 
 namespace org::apache::nifi::minifi::c2 {
 
-
-AnnotatedValue parseAnnotatedValue(const rapidjson::Value& jsonValue) {
-  AnnotatedValue result;
-  if (jsonValue.IsObject() && jsonValue.HasMember("value")) {
-    result = jsonValue["value"].GetString();
-    for (const auto& annotation : jsonValue.GetObject()) {
-      if (annotation.name.GetString() == std::string("value")) {
-        continue;
-      }
-      result.annotations[annotation.name.GetString()] = 
parseAnnotatedValue(annotation.value);
-    }
-  } else if (jsonValue.IsBool()) {
-    result = jsonValue.GetBool();
-  } else {
-    result = jsonValue.GetString();
-  }
-  return result;
-}
-
-C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, 
std::span<const std::byte> response) {
+C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, 
std::span<const std::byte> response) const {
   rapidjson::Document root;
 
   try {
@@ -123,7 +105,7 @@ C2Payload RESTProtocol::parseJsonResponse(const C2Payload 
&payload, std::span<co
         for (auto key : {"content", "args"}) {
           if (request.HasMember(key) && request[key].IsObject()) {
             for (const auto &member : request[key].GetObject()) {
-              new_command.operation_arguments[member.name.GetString()] = 
parseAnnotatedValue(member.value);
+              new_command.operation_arguments[member.name.GetString()] = 
C2Value{member.value};
             }
             break;
           }
@@ -136,6 +118,8 @@ C2Payload RESTProtocol::parseJsonResponse(const C2Payload 
&payload, std::span<co
       // we have a response for this request
       return new_payload;
       // }
+    } else {
+      logger_->log_error("Failed to parse json response: {} at {}", 
rapidjson::GetParseError_En(ok.Code()), ok.Offset());
     }
   } catch (...) {
   }
diff --git a/libminifi/src/c2/protocols/RESTSender.cpp 
b/libminifi/src/c2/protocols/RESTSender.cpp
index 98642b009..659ed51db 100644
--- a/libminifi/src/c2/protocols/RESTSender.cpp
+++ b/libminifi/src/c2/protocols/RESTSender.cpp
@@ -41,10 +41,23 @@ void 
RESTSender::initialize(core::controller::ControllerServiceProvider* control
   RESTProtocol::initialize(controller, configure);
   // base URL when one is not specified.
   if (nullptr != configure) {
+    std::optional<std::string> rest_base_path = 
configure->get(Configuration::nifi_c2_rest_path_base);
     std::string update_str;
     std::string ssl_context_service_str;
     configure->get(Configuration::nifi_c2_rest_url, "c2.rest.url", rest_uri_);
     configure->get(Configuration::nifi_c2_rest_url_ack, "c2.rest.url.ack", 
ack_uri_);
+    if (rest_uri_.starts_with("/")) {
+      if (!rest_base_path) {
+        throw Exception(ExceptionType::GENERAL_EXCEPTION, "Cannot use relative 
nifi.c2.rest.url unless the nifi.c2.rest.path.base is set");
+      }
+      rest_uri_ = rest_base_path.value() + rest_uri_;
+    }
+    if (ack_uri_.starts_with("/")) {
+      if (!rest_base_path) {
+        throw Exception(ExceptionType::GENERAL_EXCEPTION, "Cannot use relative 
nifi.c2.rest.url.ack unless the nifi.c2.rest.path.base is set");
+      }
+      ack_uri_ = rest_base_path.value() + ack_uri_;
+    }
     if (controller && 
configure->get(Configuration::nifi_c2_rest_ssl_context_service, 
"c2.rest.ssl.context.service", ssl_context_service_str)) {
       if (auto service = 
controller->getControllerService(ssl_context_service_str)) {
         ssl_context_service_ = 
std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp 
b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
index c3e94daa2..28bf3082b 100644
--- a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
+++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
@@ -29,8 +29,8 @@ C2Payload FileUpdateTrigger::getAction() {
     C2ContentResponse resp(Operation::update);
     resp.ident = "triggered";
     resp.name = "configuration";
-    resp.operation_arguments["location"] = file_;
-    resp.operation_arguments["persist"] = "true";
+    resp.operation_arguments["location"] = C2Value{file_};
+    resp.operation_arguments["persist"] = C2Value{"true"};
     response_payload.addContent(std::move(resp));
     update_ = false;
     return response_payload;
diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp 
b/libminifi/src/core/state/MetricsPublisherStore.cpp
index 269cb6267..abd1550d8 100644
--- a/libminifi/src/core/state/MetricsPublisherStore.cpp
+++ b/libminifi/src/core/state/MetricsPublisherStore.cpp
@@ -23,9 +23,9 @@
 namespace org::apache::nifi::minifi::state {
 
 MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> 
configuration, const 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& 
repository_metric_sources,
-  std::shared_ptr<core::FlowConfiguration> flow_configuration)
+  std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager)
     : configuration_(configuration),
-      
response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration),
 repository_metric_sources, std::move(flow_configuration))) {
+      
response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration),
 repository_metric_sources, std::move(flow_configuration), asset_manager)) {
 }
 
 void 
MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* 
controller, state::StateMonitor* update_sink) {
diff --git a/libminifi/src/core/state/nodes/AssetInformation.cpp 
b/libminifi/src/core/state/nodes/AssetInformation.cpp
new file mode 100644
index 000000000..4a243b607
--- /dev/null
+++ b/libminifi/src/core/state/nodes/AssetInformation.cpp
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/state/nodes/AssetInformation.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerFactory.h"
+
+namespace org::apache::nifi::minifi::state::response {
+
+AssetInformation::AssetInformation()
+  : logger_(core::logging::LoggerFactory<AssetInformation>().getLogger()) {}
+
+void AssetInformation::setAssetManager(utils::file::AssetManager* 
asset_manager) {
+  asset_manager_ = asset_manager;
+  if (!asset_manager_) {
+    logger_->log_error("No asset manager is provided, asset information will 
not be available");
+  }
+}
+
+std::vector<SerializedResponseNode> AssetInformation::serialize() {
+  if (!asset_manager_) {
+    return {};
+  }
+  SerializedResponseNode node;
+  node.name = "hash";
+  node.value = asset_manager_->hash();
+
+  return std::vector<SerializedResponseNode>{node};
+}
+
+REGISTER_RESOURCE(AssetInformation, DescriptionOnly);
+
+}  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 58ff6ecea..8d9dc1845 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -25,6 +25,7 @@
 #include "core/state/nodes/QueueMetrics.h"
 #include "core/state/nodes/AgentInformation.h"
 #include "core/state/nodes/ConfigurationChecksums.h"
+#include "core/state/nodes/AssetInformation.h"
 #include "utils/gsl.h"
 #include "utils/RegexUtils.h"
 #include "utils/StringUtils.h"
@@ -33,10 +34,11 @@
 namespace org::apache::nifi::minifi::state::response {
 
 ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> 
configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources,
-  std::shared_ptr<core::FlowConfiguration> flow_configuration)
+  std::shared_ptr<core::FlowConfiguration> flow_configuration, 
utils::file::AssetManager* asset_manager)
     : configuration_(std::move(configuration)),
       repository_metric_sources_(std::move(repository_metric_sources)),
-      flow_configuration_(std::move(flow_configuration)) {
+      flow_configuration_(std::move(flow_configuration)),
+      asset_manager_(asset_manager) {
 }
 
 void ResponseNodeLoader::clearConfigRoot() {
@@ -194,6 +196,13 @@ void 
ResponseNodeLoader::initializeConfigurationChecksums(const SharedResponseNo
   }
 }
 
+void ResponseNodeLoader::initializeAssetInformation(const SharedResponseNode& 
response_node) const {
+  auto asset_info = 
dynamic_cast<state::response::AssetInformation*>(response_node.get());
+  if (asset_info) {
+    asset_info->setAssetManager(asset_manager_);
+  }
+}
+
 void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& 
response_node) const {
   auto flowMonitor = 
dynamic_cast<state::response::FlowMonitor*>(response_node.get());
   if (flowMonitor == nullptr) {
@@ -231,6 +240,7 @@ std::vector<SharedResponseNode> 
ResponseNodeLoader::loadResponseNodes(const std:
     initializeAgentStatus(response_node);
     initializeConfigurationChecksums(response_node);
     initializeFlowMonitor(response_node);
+    initializeAssetInformation(response_node);
   }
   return response_nodes;
 }
diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp 
b/libminifi/src/core/state/nodes/SupportedOperations.cpp
index b0681415f..51f86c40b 100644
--- a/libminifi/src/core/state/nodes/SupportedOperations.cpp
+++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp
@@ -110,6 +110,10 @@ void 
SupportedOperations::fillProperties(SerializedResponseNode& properties, min
       }
       break;
     }
+    case minifi::c2::Operation::sync: {
+      serializeProperty<minifi::c2::SyncOperand>(properties);
+      break;
+    }
     default:
       break;
   }
diff --git a/libminifi/src/utils/file/AssetManager.cpp 
b/libminifi/src/utils/file/AssetManager.cpp
new file mode 100644
index 000000000..3444cb55c
--- /dev/null
+++ b/libminifi/src/utils/file/AssetManager.cpp
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/AssetManager.h"
+#include "utils/file/FileUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "core/logging/LoggerFactory.h"
+#include "utils/Hash.h"
+
+#undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, 
which conflicts with rapidjson
+
+namespace org::apache::nifi::minifi::utils::file {
+
+AssetManager::AssetManager(const Configure& configuration)
+    : 
root_(configuration.get(Configure::nifi_asset_directory).value_or((configuration.getHome()
 / "asset").string())),
+      logger_(core::logging::LoggerFactory<AssetManager>::getLogger()) {
+  refreshState();
+}
+
+void AssetManager::refreshState() {
+  std::lock_guard lock(mtx_);
+  state_.clear();
+  if (!FileUtils::exists(root_)) {
+    std::filesystem::create_directories(root_);
+  }
+  if (!FileUtils::exists(root_ / ".state")) {
+    std::ofstream{root_ / ".state", std::ios::binary} << R"({"digest": "", 
"assets": {}})";
+  }
+  rapidjson::Document doc;
+
+  std::string file_content = get_content(root_ / ".state");
+
+  rapidjson::ParseResult res = doc.Parse(file_content.c_str(), 
file_content.size());
+  if (!res) {
+    logger_->log_error("Failed to parse asset '.state' file, not a valid json 
file");
+    return;
+  }
+  if (!doc.IsObject()) {
+    logger_->log_error("Asset '.state' file is malformed");
+    return;
+  }
+  if (!doc.HasMember("digest")) {
+    logger_->log_error("Asset '.state' file is malformed, missing 'digest'");
+    return;
+  }
+  if (!doc["digest"].IsString()) {
+    logger_->log_error("Asset '.state' file is malformed, 'digest' is not a 
string");
+    return;
+  }
+  if (!doc.HasMember("assets")) {
+    logger_->log_error("Asset '.state' file is malformed, missing 'assets'");
+    return;
+  }
+  if (!doc["assets"].IsObject()) {
+    logger_->log_error("Asset '.state' file is malformed, 'assets' is not an 
object");
+    return;
+  }
+
+
+  AssetLayout new_state;
+  new_state.digest = std::string{doc["digest"].GetString(), 
doc["digest"].GetStringLength()};
+
+  for (auto& [id, entry] : doc["assets"].GetObject()) {
+    if (!entry.IsObject()) {
+      logger_->log_error("Asset '.state' file is malformed, 'assets.{}' is not 
an object", std::string_view{id.GetString(), id.GetStringLength()});
+      return;
+    }
+    AssetDescription description;
+    description.id = std::string{id.GetString(), id.GetStringLength()};
+    if (!entry.HasMember("path") || !entry["path"].IsString()) {
+      logger_->log_error("Asset '.state' file is malformed, 'assets.{}.path' 
does not exist or is not a string", std::string_view{id.GetString(), 
id.GetStringLength()});
+      return;
+    }
+    description.path = std::string{entry["path"].GetString(), 
entry["path"].GetStringLength()};
+    if (!entry.HasMember("url") || !entry["url"].IsString()) {
+      logger_->log_error("Asset '.state' file is malformed, 'assets.{}.url' 
does not exist or is not a string", std::string_view{id.GetString(), 
id.GetStringLength()});
+      return;
+    }
+    description.url = std::string{entry["url"].GetString(), 
entry["url"].GetStringLength()};
+
+    if (FileUtils::exists(root_ / description.path)) {
+      new_state.assets.insert(std::move(description));
+    } else {
+      logger_->log_error("Asset '.state' file contains entry '{}' that does 
not exist on the filesystem at '{}'",
+                         std::string_view{id.GetString(), 
id.GetStringLength()}, (root_ / description.path).string());
+    }
+  }
+  state_ = std::move(new_state);
+}
+
+std::string AssetManager::hash() const {
+  std::lock_guard lock(mtx_);
+  return state_.digest.empty() ? "null" : state_.digest;
+}
+
+nonstd::expected<void, std::string> AssetManager::sync(
+    const AssetLayout& layout,
+    const std::function<nonstd::expected<std::vector<std::byte>, 
std::string>(std::string_view /*url*/)>& fetch) {
+  logger_->log_info("Synchronizing assets");
+  std::lock_guard lock(mtx_);
+  AssetLayout new_state{
+    .digest = state_.digest,
+    .assets = {}
+  };
+  std::string fetch_errors;
+  std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>> 
new_file_contents;
+  for (auto& new_entry : layout.assets) {
+    if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto& 
entry) {return entry.id == new_entry.id;}) == state_.assets.end()) {
+      logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}", 
new_entry.id, new_entry.path.string(), new_entry.url);
+      if (auto data = fetch(new_entry.url)) {
+        new_file_contents.emplace_back(new_entry.path, data.value());
+        new_state.assets.insert(new_entry);
+      } else {
+        logger_->log_error("Failed to fetch asset (id = '{}', path = '{}') 
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url, 
data.error());
+        fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" + 
new_entry.url + "': " + data.error() + "\n";
+      }
+    } else {
+      logger_->log_info("Asset (id = '{}', path = '{}') already exists", 
new_entry.id, new_entry.path.string());
+      new_state.assets.insert(new_entry);
+    }
+  }
+  if (fetch_errors.empty()) {
+    new_state.digest = layout.digest;
+  }
+
+  for (auto& old_entry : state_.assets) {
+    if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto& 
entry) {return entry.id == old_entry.id;}) == layout.assets.end()) {
+      logger_->log_info("We no longer need asset (id = '{}', path = '{}')", 
old_entry.id, old_entry.path.string());
+      std::filesystem::remove(root_ / old_entry.path);
+    }
+  }
+
+  for (auto& [path, content] : new_file_contents) {
+    create_dir((root_ / path).parent_path());
+    std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const 
char*>(content.data()), gsl::narrow<std::streamsize>(content.size()));
+  }
+
+  state_ = std::move(new_state);
+  persist();
+
+  if (!fetch_errors.empty()) {
+    return nonstd::make_unexpected(fetch_errors);
+  }
+
+  return {};
+}
+
+void AssetManager::persist() const {
+  std::lock_guard lock(mtx_);
+  rapidjson::Document doc;
+  doc.SetObject();
+
+  doc.AddMember(rapidjson::StringRef("digest"), 
rapidjson::Value{state_.digest, doc.GetAllocator()}, doc.GetAllocator());
+  doc.AddMember(rapidjson::StringRef("assets"), 
rapidjson::Value{rapidjson::kObjectType}, doc.GetAllocator());
+
+  for (auto& entry : state_.assets) {
+    rapidjson::Value entry_val(rapidjson::kObjectType);
+    entry_val.AddMember(rapidjson::StringRef("path"), 
rapidjson::Value(entry.path.generic_string(), doc.GetAllocator()), 
doc.GetAllocator());
+    entry_val.AddMember(rapidjson::StringRef("url"), 
rapidjson::StringRef(entry.url), doc.GetAllocator());
+    doc["assets"].AddMember(rapidjson::StringRef(entry.id), entry_val, 
doc.GetAllocator());
+  }
+
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  std::ofstream{root_ / ".state", std::ios::binary}.write(buffer.GetString(), 
gsl::narrow<std::streamsize>(buffer.GetSize()));
+}
+
+std::filesystem::path AssetManager::getRoot() const {
+  std::lock_guard lock(mtx_);
+  return root_;
+}
+
+}  // namespace org::apache::nifi::minifi::utils::file
diff --git a/libminifi/test/integration/C2AssetSyncTest.cpp 
b/libminifi/test/integration/C2AssetSyncTest.cpp
new file mode 100644
index 000000000..5731025eb
--- /dev/null
+++ b/libminifi/test/integration/C2AssetSyncTest.cpp
@@ -0,0 +1,280 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+#include <string>
+#include <fstream>
+#include <iterator>
+
+#include "integration/HTTPIntegrationBase.h"
+#include "integration/HTTPHandlers.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/AssetManager.h"
+#include "unit/TestUtils.h"
+#include "unit/Catch.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class FileProvider : public ServerAwareHandler {
+ public:
+  explicit FileProvider(std::string file_content): 
file_content_(std::move(file_content)) {}
+
+  bool handleGet(CivetServer* /*server*/, struct mg_connection* conn) override 
{
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                    "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
+              file_content_.length());
+    mg_printf(conn, "%s", file_content_.c_str());
+    return true;
+  }
+
+ private:
+  std::string file_content_;
+};
+
+class C2HeartbeatHandler : public HeartbeatHandler {
+ public:
+  using HeartbeatHandler::HeartbeatHandler;
+  using AssetDescription = 
org::apache::nifi::minifi::utils::file::AssetDescription;
+
+  void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* 
conn) override {
+    std::string hb_str = [&] {
+      rapidjson::StringBuffer buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+      root.Accept(writer);
+      return std::string{buffer.GetString(), buffer.GetSize()};
+    }();
+    auto& asset_info_node = root["resourceInfo"];
+    auto& asset_hash_node = asset_info_node["hash"];
+    std::string asset_hash{asset_hash_node.GetString(), 
asset_hash_node.GetStringLength()};
+
+    std::vector<C2Operation> operations;
+    {
+      std::lock_guard guard(asset_mtx_);
+      agent_asset_hash_ = asset_hash;
+      if (asset_hash != calculateAssetHash()) {
+        std::unordered_map<std::string, c2::C2Value> args;
+        rapidjson::Document global_hash_doc{rapidjson::kObjectType};
+        global_hash_doc.AddMember("digest", calculateAssetHash(), 
global_hash_doc.GetAllocator());
+        args["globalHash"] = minifi::c2::C2Value{std::move(global_hash_doc)};
+        rapidjson::Document resource_list_doc{rapidjson::kArrayType};
+
+        for (auto& asset : expected_assets_) {
+          rapidjson::Value resource_obj{rapidjson::kObjectType};
+          resource_obj.AddMember("resourceId", asset.id, 
resource_list_doc.GetAllocator());
+          resource_obj.AddMember("resourceName", 
asset.path.filename().string(), resource_list_doc.GetAllocator());
+          resource_obj.AddMember("resourceType", "ASSET", 
resource_list_doc.GetAllocator());
+          resource_obj.AddMember("resourcePath", 
asset.path.parent_path().string(), resource_list_doc.GetAllocator());
+          resource_obj.AddMember("url", asset.url, 
resource_list_doc.GetAllocator());
+          resource_list_doc.PushBack(resource_obj, 
resource_list_doc.GetAllocator());
+        }
+        args["resourceList"] = 
minifi::c2::C2Value{std::move(resource_list_doc)};
+
+        operations.push_back(C2Operation{
+          .operation = "sync",
+          .operand = "resource",
+          .operation_id = std::to_string(next_op_id_++),
+          .args = std::move(args)
+        });
+      }
+    }
+    sendHeartbeatResponse(operations, conn);
+  }
+
+  void addAsset(std::string id, std::string path, std::string url) {
+    std::lock_guard guard(asset_mtx_);
+    expected_assets_.insert(AssetDescription{
+      .id = std::move(id),
+      .path = std::move(path),
+      .url = std::move(url)
+    });
+  }
+
+  void removeAsset(std::string id) {
+    std::lock_guard guard{asset_mtx_};
+    expected_assets_.erase(AssetDescription{.id = std::move(id), .path = {}, 
.url = {}});
+  }
+
+  std::optional<std::string> getAgentAssetHash() const {
+    std::lock_guard lock(asset_mtx_);
+    return agent_asset_hash_;
+  }
+
+  std::string calculateAssetHash() const {
+    std::lock_guard guard{asset_mtx_};
+    size_t hash_value{0};
+    for (auto& asset : expected_assets_) {
+      hash_value = minifi::utils::hash_combine(hash_value, 
std::hash<std::string>{}(asset.id));
+    }
+    return std::to_string(hash_value);
+  }
+
+  std::string assetState() const {
+    std::lock_guard guard{asset_mtx_};
+    rapidjson::Document doc;
+    doc.SetObject();
+    doc.AddMember(rapidjson::StringRef("digest"), 
rapidjson::Value{calculateAssetHash(), doc.GetAllocator()}, doc.GetAllocator());
+    doc.AddMember(rapidjson::StringRef("assets"), 
rapidjson::Value{rapidjson::kObjectType}, doc.GetAllocator());
+    for (auto& asset : expected_assets_) {
+      auto path_str = asset.path.string();
+      doc["assets"].AddMember(rapidjson::StringRef(asset.id), 
rapidjson::kObjectType, doc.GetAllocator());
+      doc["assets"][asset.id].AddMember(rapidjson::StringRef("path"), 
rapidjson::Value(path_str, doc.GetAllocator()), doc.GetAllocator());
+      doc["assets"][asset.id].AddMember(rapidjson::StringRef("url"), 
rapidjson::StringRef(asset.url), doc.GetAllocator());
+    }
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    doc.Accept(writer);
+
+    return {buffer.GetString(), buffer.GetSize()};
+  }
+
+ private:
+  mutable std::recursive_mutex asset_mtx_;
+  std::set<AssetDescription> expected_assets_;
+
+  std::optional<std::string> agent_asset_hash_;
+
+  std::atomic<size_t> next_op_id_{1};
+};
+
+class VerifyC2AssetSync : public VerifyC2Base {
+ public:
+  void configureC2() override {
+    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+    configuration->set("nifi.c2.enable", "true");
+    configuration->set("nifi.c2.agent.heartbeat.period", "100");
+    configuration->set("nifi.c2.root.classes", 
"DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation");
+  }
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  void setVerifier(std::function<void()> verify) {
+    verify_ = std::move(verify);
+  }
+
+ private:
+  std::function<void()> verify_;
+};
+
+TEST_CASE("C2AssetSync", "[c2test]") {
+  TestController controller;
+
+  // setup minifi home
+  const std::filesystem::path home_dir = controller.createTempDirectory();
+  const auto asset_dir = home_dir / "asset";
+
+  std::filesystem::current_path(home_dir);
+  auto wd_guard = gsl::finally([] {
+    std::filesystem::current_path(minifi::utils::file::get_executable_dir());
+  });
+
+  C2AcknowledgeHandler ack_handler;
+  std::string file_A = "hello from file A";
+  FileProvider file_A_provider{file_A};
+  std::string file_B = "hello from file B";
+  FileProvider file_B_provider{file_B};
+  std::string file_C = "hello from file C";
+  FileProvider file_C_provider{file_C};
+  std::string file_A_v2 = "hello from file A version 2";
+  FileProvider file_Av2_provider{file_A_v2};
+  C2HeartbeatHandler hb_handler{std::make_shared<minifi::Configure>()};
+
+  VerifyC2AssetSync harness;
+  harness.setUrl("http://localhost:0/api/file/A.txt";, &file_A_provider);
+  harness.setUrl("http://localhost:0/api/file/Av2.txt";, &file_Av2_provider);
+  harness.setUrl("http://localhost:0/api/file/B.txt";, &file_B_provider);
+  harness.setUrl("http://localhost:0/api/file/C.txt";, &file_C_provider);
+
+  std::string absolute_file_A_url = "http://localhost:"; + harness.getWebPort() 
+ "/api/file/A.txt";
+
+  hb_handler.addAsset("Av1", "A.txt", "/api/file/A.txt");
+  hb_handler.addAsset("Bv1", "nested/dir/B.txt", "/api/file/B.txt");
+  hb_handler.addAsset("Cv1", "nested/C.txt", "/api/file/C.txt");
+
+  harness.setUrl("http://localhost:0/api/heartbeat";, &hb_handler);
+  harness.setUrl("http://localhost:0/api/acknowledge";, &ack_handler);
+  harness.setC2Url("/api/heartbeat", "/api/acknowledge");
+
+  auto get_asset_structure = [&] () {
+    std::unordered_map<std::string, std::string> contents;
+    for (auto& [dir, file] : minifi::utils::file::list_dir_all(asset_dir, 
controller.getLogger())) {
+      contents[(dir / file).string()] = minifi::utils::file::get_content(dir / 
file);
+    }
+    return contents;
+  };
+
+  harness.setVerifier([&] () {
+    REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {
+      std::cout << "calculated hash = " << hb_handler.calculateAssetHash() << 
std::endl;
+      std::cout << "reported hash = " << 
hb_handler.getAgentAssetHash().value_or("<missing>") << std::endl;
+      return hb_handler.calculateAssetHash() == hb_handler.getAgentAssetHash();
+    }));
+
+    {
+      std::unordered_map<std::string, std::string> expected_assets{
+          {(asset_dir / "A.txt").string(), file_A},
+          {(asset_dir / "nested" / "dir" / "B.txt").string(), file_B},
+          {(asset_dir / "nested" / "C.txt").string(), file_C},
+          {(asset_dir / ".state").string(), hb_handler.assetState()}
+      };
+      auto actual_assets = get_asset_structure();
+      if (actual_assets != expected_assets) {
+        controller.getLogger()->log_error("Mismatch between expected and 
actual assets");
+        for (auto& [path, content] : expected_assets) {
+          controller.getLogger()->log_error("Expected asset at {}: {}", path, 
content);
+        }
+        for (auto& [path, content] : actual_assets) {
+          controller.getLogger()->log_error("Actual asset at {}: {}", path, 
content);
+        }
+        REQUIRE(false);
+      }
+    }
+
+    hb_handler.removeAsset("Av1");
+    hb_handler.removeAsset("Cv1");
+    hb_handler.addAsset("Av2", "A.txt", "/api/file/Av2.txt");
+
+
+    REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {return 
hb_handler.calculateAssetHash() == hb_handler.getAgentAssetHash();}));
+
+    {
+      std::unordered_map<std::string, std::string> expected_assets{
+          {(asset_dir / "A.txt").string(), file_A_v2},
+          {(asset_dir / "nested" / "dir" / "B.txt").string(), file_B},
+          {(asset_dir / ".state").string(), hb_handler.assetState()}
+      };
+
+      auto actual_assets = get_asset_structure();
+      if (actual_assets != expected_assets) {
+        controller.getLogger()->log_error("Mismatch between expected and 
actual assets");
+        for (auto& [path, content] : expected_assets) {
+          controller.getLogger()->log_error("Expected asset at {}: {}", path, 
content);
+        }
+        for (auto& [path, content] : actual_assets) {
+          controller.getLogger()->log_error("Actual asset at {}: {}", path, 
content);
+        }
+        REQUIRE(false);
+      }
+    }
+  });
+
+  harness.run();
+}
+
+}  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp 
b/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp
index 07081d815..a936bcd88 100644
--- a/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp
+++ b/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp
@@ -97,7 +97,7 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler 
{
         break;
       case FlowState::FIRST_DESCRIBE_ACK:
       case FlowState::CLEAR_SENT: {
-        sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { 
{"corecomponent1", "TailFile1"} });
+        sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { 
{"corecomponent1", minifi::c2::C2Value{"TailFile1"}} });
         flow_state_ = FlowState::CLEAR_SENT;
         break;
       }
diff --git a/libminifi/test/integration/C2DescribeMetricsTest.cpp 
b/libminifi/test/integration/C2DescribeMetricsTest.cpp
index 692e0e37b..9e5bcbcdf 100644
--- a/libminifi/test/integration/C2DescribeMetricsTest.cpp
+++ b/libminifi/test/integration/C2DescribeMetricsTest.cpp
@@ -64,11 +64,11 @@ class MetricsHandler: public HeartbeatHandler {
   void handleHeartbeat(const rapidjson::Document&, struct mg_connection* conn) 
override {
     switch (state_) {
       case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
-        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, 
{{"metricsClass", "GetFileMetrics"}});
+        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, 
{{"metricsClass", minifi::c2::C2Value{"GetFileMetrics"}}});
         break;
       }
       case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
-        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, 
{{"metricsClass", "QueueMetrics"}});
+        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, 
{{"metricsClass", minifi::c2::C2Value{"QueueMetrics"}}});
         break;
       }
       case TestState::DESCRIBE_ALL_METRICS: {
diff --git a/libminifi/test/integration/C2MetricsTest.cpp 
b/libminifi/test/integration/C2MetricsTest.cpp
index 4d356fc2b..39a0a19c7 100644
--- a/libminifi/test/integration/C2MetricsTest.cpp
+++ b/libminifi/test/integration/C2MetricsTest.cpp
@@ -62,7 +62,7 @@ class MetricsHandler: public HeartbeatHandler {
   explicit MetricsHandler(std::atomic_bool& metrics_updated_successfully, 
std::shared_ptr<minifi::Configure> configuration, const std::filesystem::path& 
replacement_config_path)
     : HeartbeatHandler(std::move(configuration)),
       metrics_updated_successfully_(metrics_updated_successfully),
-      
replacement_config_(getReplacementConfigAsJsonValue(replacement_config_path.string()))
 {
+      
replacement_config_(minifi::utils::file::get_content(replacement_config_path.string()))
 {
   }
 
   void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* 
conn) override {
@@ -73,7 +73,7 @@ class MetricsHandler: public HeartbeatHandler {
         break;
       }
       case TestState::SEND_NEW_CONFIG: {
-        sendHeartbeatResponse("UPDATE", "configuration", "889348", conn, 
{{"configuration_data", replacement_config_}});
+        sendHeartbeatResponse("UPDATE", "configuration", "889348", conn, 
{{"configuration_data", minifi::c2::C2Value{replacement_config_}}});
         test_state_ = TestState::VERIFY_UPDATED_METRICS;
         break;
       }
@@ -178,14 +178,6 @@ class MetricsHandler: public HeartbeatHandler {
       
processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes");
   }
 
-  [[nodiscard]] static std::string getReplacementConfigAsJsonValue(const 
std::string& replacement_config_path) {
-    std::ifstream is(replacement_config_path);
-    auto content = std::string((std::istreambuf_iterator<char>(is)), 
std::istreambuf_iterator<char>());
-    content = minifi::utils::string::replaceAll(content, "\n", "\\n");
-    content = minifi::utils::string::replaceAll(content, "\"", "\\\"");
-    return content;
-  }
-
   std::atomic_bool& metrics_updated_successfully_;
   TestState test_state_ = TestState::VERIFY_INITIAL_METRICS;
   std::string replacement_config_;
diff --git a/libminifi/test/integration/C2UpdateAssetTest.cpp 
b/libminifi/test/integration/C2UpdateAssetTest.cpp
index 71cd0b737..907b9f921 100644
--- a/libminifi/test/integration/C2UpdateAssetTest.cpp
+++ b/libminifi/test/integration/C2UpdateAssetTest.cpp
@@ -56,7 +56,7 @@ class C2HeartbeatHandler : public HeartbeatHandler {
     return true;
   }
 
-  void addOperation(std::string id, std::unordered_map<std::string, 
std::string> args) {
+  void addOperation(std::string id, std::unordered_map<std::string, 
c2::C2Value> args) {
     std::lock_guard<std::mutex> guard(op_mtx_);
     operations_.push_back(C2Operation{
       .operation = "update",
@@ -92,7 +92,7 @@ class VerifyC2AssetUpdate : public VerifyC2Base {
 
 struct AssetUpdateOperation {
   std::string id;
-  std::unordered_map<std::string, std::string> args;
+  std::unordered_map<std::string, c2::C2Value> args;
   std::string state;
   std::optional<std::string> details;
 };
@@ -103,7 +103,11 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   // setup minifi home
   const std::filesystem::path home_dir = controller.createTempDirectory();
   const auto asset_dir = home_dir / "asset";
+
   std::filesystem::current_path(home_dir);
+  auto wd_guard = gsl::finally([] {
+    std::filesystem::current_path(minifi::utils::file::get_executable_dir());
+  });
 
   C2AcknowledgeHandler ack_handler;
   std::string file_A = "hello from file A";
@@ -130,7 +134,7 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "2",
     .args = {
-        {"file", "my_file.txt"}
+        {"file", minifi::c2::C2Value{"my_file.txt"}}
     },
     .state = "NOT_APPLIED",
     .details = "Couldn't find 'url' argument"
@@ -139,8 +143,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "3",
     .args = {
-        {"file", "my_file.txt"},
-        {"url", "/api/file/A.txt"}
+        {"file", minifi::c2::C2Value{"my_file.txt"}},
+        {"url", minifi::c2::C2Value{"/api/file/A.txt"}}
     },
     .state = "FULLY_APPLIED",
     .details = std::nullopt
@@ -149,8 +153,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "4",
     .args = {
-        {"file", "my_file.txt"},
-        {"url", "/api/file/A.txt"}
+        {"file", minifi::c2::C2Value{"my_file.txt"}},
+        {"url", minifi::c2::C2Value{"/api/file/A.txt"}}
     },
     .state = "NO_OPERATION",
     .details = std::nullopt
@@ -159,9 +163,9 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "5",
     .args = {
-        {"file", "my_file.txt"},
-        {"url", "/api/file/B.txt"},
-        {"forceDownload", "true"}
+        {"file", minifi::c2::C2Value{"my_file.txt"}},
+        {"url", minifi::c2::C2Value{"/api/file/B.txt"}},
+        {"forceDownload", minifi::c2::C2Value{"true"}}
     },
     .state = "FULLY_APPLIED",
     .details = std::nullopt
@@ -170,8 +174,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "6",
     .args = {
-        {"file", "new_dir/inner/my_file.txt"},
-        {"url", "/api/file/A.txt"}
+        {"file", minifi::c2::C2Value{"new_dir/inner/my_file.txt"}},
+        {"url", minifi::c2::C2Value{"/api/file/A.txt"}}
     },
     .state = "FULLY_APPLIED",
     .details = std::nullopt
@@ -180,8 +184,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "7",
     .args = {
-        {"file", "dummy.txt"},
-        {"url", "/not_existing_api/file.txt"}
+        {"file", minifi::c2::C2Value{"dummy.txt"}},
+        {"url", minifi::c2::C2Value{"/not_existing_api/file.txt"}}
     },
     .state = "NOT_APPLIED",
     .details = "Failed to fetch asset"
@@ -190,8 +194,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "8",
     .args = {
-        {"file", "../../system_lib.dll"},
-        {"url", "/not_existing_api/file.txt"}
+        {"file", minifi::c2::C2Value{"../../system_lib.dll"}},
+        {"url", minifi::c2::C2Value{"/not_existing_api/file.txt"}}
     },
     .state = "NOT_APPLIED",
     .details = "Accessing parent directory is forbidden in file path"
@@ -200,8 +204,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
   operations.push_back({
     .id = "9",
     .args = {
-        {"file", "other_dir/A.txt"},
-        {"url", absolute_file_A_url}
+        {"file", minifi::c2::C2Value{"other_dir/A.txt"}},
+        {"url", minifi::c2::C2Value{absolute_file_A_url}}
     },
     .state = "FULLY_APPLIED",
     .details = std::nullopt
@@ -244,11 +248,12 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
       // this op failed no file made on the disk
       continue;
     }
-    expected_files[(asset_dir / op.args["file"]).string()] = 
minifi::utils::string::endsWith(op.args["url"], "A.txt") ? file_A : file_B;
+    expected_files[(asset_dir / op.args["file"].to_string()).string()] = 
minifi::utils::string::endsWith(op.args["url"].to_string(), "A.txt") ? file_A : 
file_B;
   }
 
   size_t file_count = minifi::utils::file::list_dir_all(asset_dir.string(), 
controller.getLogger()).size();
-  if (file_count != expected_files.size()) {
+  // + 1 is for the .state file from the AssetManager
+  if (file_count != expected_files.size() + 1) {
     controller.getLogger()->log_error("Expected {} files, got {}", 
expected_files.size(), file_count);
     REQUIRE(false);
   }
@@ -258,8 +263,6 @@ TEST_CASE("Test update asset C2 command", "[c2test]") {
       REQUIRE(false);
     }
   }
-
-  std::filesystem::current_path(minifi::utils::file::get_executable_dir());
 }
 
 }  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/libtest/integration/HTTPHandlers.cpp 
b/libminifi/test/libtest/integration/HTTPHandlers.cpp
index eb357b2b9..31578022a 100644
--- a/libminifi/test/libtest/integration/HTTPHandlers.cpp
+++ b/libminifi/test/libtest/integration/HTTPHandlers.cpp
@@ -56,7 +56,7 @@ bool PeerResponder::handleGet(CivetServer* /*server*/, struct 
mg_connection *con
 #else
   std::string hostname = "localhost";
 #endif
-  std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + 
hostname + "\", \"port\": " + port + ",  \"secure\": false, \"flowFileCount\" : 
0 }] }";
+  std::string site2site_rest_resp = R"({"peers" : [{ "hostname": ")" + 
hostname + R"(", "port": )" + port + R"(,  "secure": false, "flowFileCount" : 0 
}] })";
   std::stringstream headers;
   headers << "HTTP/1.1 200 OK\r\nContent-Type: 
application/json\r\nContent-Length: " << site2site_rest_resp.length() << 
"\r\nConnection: close\r\n\r\n";
   mg_printf(conn, "%s", headers.str().c_str());
@@ -134,7 +134,8 @@ bool FlowFileResponder::handlePost(CivetServer* /*server*/, 
struct mg_connection
     const auto flow = std::make_shared<FlowObj>();
 
     for (uint32_t i = 0; i < num_attributes; i++) {
-      std::string name, value;
+      std::string name;
+      std::string value;
       {
         const auto read = stream.read(name, true);
         if (!isServerRunning()) return false;
@@ -204,7 +205,7 @@ bool FlowFileResponder::handleGet(CivetServer* /*server*/, 
struct mg_connection
     minifi::io::BufferStream serializer;
     minifi::io::CRCStream <minifi::io::OutputStream> 
stream(gsl::make_not_null(&serializer));
     for (const auto& flow : flows) {
-      uint32_t num_attributes = gsl::narrow<uint32_t>(flow->attributes.size());
+      auto num_attributes = gsl::narrow<uint32_t>(flow->attributes.size());
       stream.write(num_attributes);
       for (const auto& entry : flow->attributes) {
         stream.write(entry.first);
@@ -235,41 +236,38 @@ bool 
DeleteTransactionResponder::handleDelete(CivetServer* /*server*/, struct mg
 }
 
 void HeartbeatHandler::sendHeartbeatResponse(const std::vector<C2Operation>& 
operations, struct mg_connection * conn) {
-  std::string operation_jsons;
+  rapidjson::Document hb_obj{rapidjson::kObjectType};
+  hb_obj.AddMember("operation", "heartbeat", hb_obj.GetAllocator());
+  hb_obj.AddMember("requested_operations", rapidjson::kArrayType, 
hb_obj.GetAllocator());
   for (const auto& c2_operation : operations) {
-    std::string resp_args;
+    rapidjson::Value op{rapidjson::kObjectType};
+    op.AddMember("operation", c2_operation.operation, hb_obj.GetAllocator());
+    op.AddMember("operationid", c2_operation.operation_id, 
hb_obj.GetAllocator());
+    op.AddMember("operand", c2_operation.operand, hb_obj.GetAllocator());
     if (!c2_operation.args.empty()) {
-      resp_args = ", \"args\": {";
-      auto it = c2_operation.args.begin();
-      while (it != c2_operation.args.end()) {
-        resp_args += "\"" + it->first + "\": \"" + it->second + "\"";
-        ++it;
-        if (it != c2_operation.args.end()) {
-          resp_args += ", ";
+      rapidjson::Value args{rapidjson::kObjectType};
+      for (auto& [arg_name, arg_val] : c2_operation.args) {
+        rapidjson::Value json_arg_val;
+        if (auto* json_val = arg_val.json()) {
+          json_arg_val.CopyFrom(*json_val, hb_obj.GetAllocator());
+        } else {
+          json_arg_val.SetString(arg_val.to_string(), hb_obj.GetAllocator());
         }
+        args.AddMember(rapidjson::StringRef(arg_name), json_arg_val, 
hb_obj.GetAllocator());
       }
-      resp_args += "}";
-    }
-
-    std::string operation_json = "{"
-      "\"operation\" : \"" + c2_operation.operation + "\","
-      "\"operationid\" : \"" + c2_operation.operation_id + "\","
-      "\"operand\": \"" + c2_operation.operand + "\"" +
-      resp_args + "}";
-
-    if (operation_jsons.empty()) {
-      operation_jsons += operation_json;
-    } else {
-      operation_jsons += ", " + operation_json;
+      op.AddMember("args", args, hb_obj.GetAllocator());
     }
+    hb_obj["requested_operations"].PushBack(op, hb_obj.GetAllocator());
   }
 
-  std::string heartbeat_response = "{\"operation\" : 
\"heartbeat\",\"requested_operations\": [ " + operation_jsons + " ]}";
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  hb_obj.Accept(writer);
 
   mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
             "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-            heartbeat_response.length());
-  mg_printf(conn, "%s", heartbeat_response.c_str());
+            buffer.GetLength());
+  mg_printf(conn, "%s", buffer.GetString());
 }
 
 void HeartbeatHandler::verifyJsonHasAgentManifest(const rapidjson::Document& 
root, const std::vector<std::string>& verify_components, const 
std::vector<std::string>& disallowed_properties) {
@@ -477,9 +475,9 @@ bool C2UpdateHandler::handlePost(CivetServer* /*server*/, 
struct mg_connection *
 }
 
 void C2UpdateHandler::setC2RestResponse(const std::string& url, const 
std::string& name, const std::optional<std::string>& persist) {
-  std::string content = "{\"location\": \"" + url + "\"";
+  std::string content = R"({"location": ")" + url + "\"";
   if (persist) {
-    content += ", \"persist\": \"" + *persist + "\"";
+    content += R"(, "persist": ")" + *persist + "\"";
   }
   content += "}";
   response_ =
diff --git a/libminifi/test/libtest/integration/HTTPHandlers.h 
b/libminifi/test/libtest/integration/HTTPHandlers.h
index a47d2a506..c73395ed0 100644
--- a/libminifi/test/libtest/integration/HTTPHandlers.h
+++ b/libminifi/test/libtest/integration/HTTPHandlers.h
@@ -214,15 +214,15 @@ class HeartbeatHandler : public ServerAwareHandler {
     std::string operation;
     std::string operand;
     std::string operation_id;
-    std::unordered_map<std::string, std::string> args;
+    std::unordered_map<std::string, c2::C2Value> args;
   };
 
-  void sendHeartbeatResponse(const std::string& operation, const std::string& 
operand, const std::string& operation_id, struct mg_connection* conn,
-      const std::unordered_map<std::string, std::string>& args = {}) {
+  static void sendHeartbeatResponse(const std::string& operation, const 
std::string& operand, const std::string& operation_id, struct mg_connection* 
conn,
+      const std::unordered_map<std::string, c2::C2Value>& args = {}) {
     sendHeartbeatResponse({{operation, operand, operation_id, args}}, conn);
   }
 
-  void sendHeartbeatResponse(const std::vector<C2Operation>& operations, 
struct mg_connection * conn);
+  static void sendHeartbeatResponse(const std::vector<C2Operation>& 
operations, struct mg_connection * conn);
   void verifyJsonHasAgentManifest(const rapidjson::Document& root, const 
std::vector<std::string>& verify_components = {}, const 
std::vector<std::string>& disallowed_properties = {});
   void verify(struct mg_connection *conn);
 
diff --git a/libminifi/test/libtest/integration/IntegrationBase.cpp 
b/libminifi/test/libtest/integration/IntegrationBase.cpp
index 36efc0cee..72ae844fd 100644
--- a/libminifi/test/libtest/integration/IntegrationBase.cpp
+++ b/libminifi/test/libtest/integration/IntegrationBase.cpp
@@ -23,6 +23,7 @@
 #include "utils/HTTPUtils.h"
 #include "unit/ProvenanceTestHelper.h"
 #include "utils/FifoExecutor.h"
+#include "utils/file/AssetManager.h"
 #include "core/ConfigurationFactory.h"
 
 namespace org::apache::nifi::minifi::test {
@@ -117,9 +118,10 @@ void IntegrationBase::run(const 
std::optional<std::filesystem::path>& test_file_
     };
 
     std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{test_repo, test_flow_repo, content_repo};
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
repo_metric_sources, flow_config);
+    asset_manager_ = 
std::make_unique<minifi::utils::file::AssetManager>(*configuration);
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
repo_metric_sources, flow_config, asset_manager_.get());
     flowController_ = std::make_unique<minifi::FlowController>(test_repo, 
test_flow_repo, configuration,
-      std::move(flow_config), content_repo, 
std::move(metrics_publisher_store), filesystem, request_restart);
+      std::move(flow_config), content_repo, 
std::move(metrics_publisher_store), filesystem, request_restart, 
asset_manager_.get());
     flowController_->load();
     updateProperties(*flowController_);
     flowController_->start();
diff --git a/libminifi/test/libtest/integration/IntegrationBase.h 
b/libminifi/test/libtest/integration/IntegrationBase.h
index 8c374159d..86a46e3f8 100644
--- a/libminifi/test/libtest/integration/IntegrationBase.h
+++ b/libminifi/test/libtest/integration/IntegrationBase.h
@@ -28,6 +28,7 @@
 #include "core/ProcessGroup.h"
 #include "FlowController.h"
 #include "properties/Configure.h"
+#include "utils/file/AssetManager.h"
 
 namespace minifi = org::apache::nifi::minifi;
 namespace core = minifi::core;
@@ -107,6 +108,7 @@ class IntegrationBase {
 
   void configureSecurity();
   std::shared_ptr<minifi::Configure> configuration;
+  std::unique_ptr<minifi::utils::file::AssetManager> asset_manager_;
   std::unique_ptr<minifi::state::response::ResponseNodeLoader> 
response_node_loader_;
   std::unique_ptr<minifi::FlowController> flowController_;
   std::chrono::milliseconds wait_time_;
diff --git a/libminifi/test/resources/encrypted.minifi.properties 
b/libminifi/test/resources/encrypted.minifi.properties
index c9f3eac06..f19422e43 100644
--- a/libminifi/test/resources/encrypted.minifi.properties
+++ b/libminifi/test/resources/encrypted.minifi.properties
@@ -57,7 +57,7 @@ nifi.c2.enable=true
 nifi.c2.flow.base.url=http://localhost:10080/c2-server/api
 nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat
 
nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge
-nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation
 ## Minimize heartbeat payload size by excluding agent manifest from the 
heartbeat
 #nifi.c2.full.heartbeat=false
 ## heartbeat 4 times a second
diff --git a/libminifi/test/unit/PayloadParserTests.cpp 
b/libminifi/test/unit/PayloadParserTests.cpp
index bdb2f5046..0a9702bfa 100644
--- a/libminifi/test/unit/PayloadParserTests.cpp
+++ b/libminifi/test/unit/PayloadParserTests.cpp
@@ -30,7 +30,7 @@ TEST_CASE("Test Valid Payload", "[tv1]") {
   minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident);
   minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, 
minifi::state::UpdateState::FULLY_APPLIED, cheese);
   minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge);
-  response.operation_arguments["type"] = "munster";
+  response.operation_arguments["type"] = minifi::c2::C2Value{"munster"};
   payload2.addContent(std::move(response));
   payload.addPayload(std::move(payload2));
   payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, 
chips));
@@ -44,7 +44,7 @@ TEST_CASE("Test Invalid not found", "[tv2]") {
   minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident);
   minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, cheese);
   minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge);
-  response.operation_arguments["typeS"] = "munster";
+  response.operation_arguments["typeS"] = minifi::c2::C2Value{"munster"};
   payload2.addContent(std::move(response));
   payload.addPayload(std::move(payload2));
   payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, 
chips));
@@ -59,7 +59,7 @@ TEST_CASE("Test Invalid coercion", "[tv3]") {
   minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident);
   minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, 
minifi::state::UpdateState::FULLY_APPLIED, cheese);
   minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge);
-  response.operation_arguments["type"] = "munster";
+  response.operation_arguments["type"] = minifi::c2::C2Value{"munster"};
   payload2.addContent(std::move(response));
   payload.addPayload(std::move(payload2));
   payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, 
chips));
@@ -73,7 +73,7 @@ TEST_CASE("Test Invalid not there", "[tv4]") {
   minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident);
   minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, 
minifi::state::UpdateState::FULLY_APPLIED, cheese);
   minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge);
-  response.operation_arguments["type"] = "munster";
+  response.operation_arguments["type"] = minifi::c2::C2Value{"munster"};
   payload2.addContent(std::move(response));
   payload.addPayload(std::move(payload2));
   payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, 
chips));
@@ -89,9 +89,9 @@ TEST_CASE("Test typed conversions", "[tv5]") {
   minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident);
   minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, 
minifi::state::UpdateState::FULLY_APPLIED, cheese);
   minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge);
-  response.operation_arguments["type"] = "munster";
-  response.operation_arguments["isvalid"] = isvalid;
-  response.operation_arguments["size"] = size;
+  response.operation_arguments["type"] = minifi::c2::C2Value{"munster"};
+  response.operation_arguments["isvalid"] = minifi::c2::C2Value{isvalid};
+  response.operation_arguments["size"] = minifi::c2::C2Value{size};
   payload2.addContent(std::move(response));
   payload.addPayload(std::move(payload2));
   payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, 
chips));
@@ -108,7 +108,7 @@ TEST_CASE("Test Invalid not there deep", "[tv6]") {
   minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident);
   minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, 
minifi::state::UpdateState::FULLY_APPLIED, cheese);
   minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge);
-  response.operation_arguments["type"] = "munster";
+  response.operation_arguments["type"] = minifi::c2::C2Value{"munster"};
   payload2.addContent(std::move(response));
   payload.addPayload(std::move(payload2));
   payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, 
chips));
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 782b06c80..cf15f5633 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -57,6 +57,7 @@
 #include "properties/Decryptor.h"
 #include "utils/file/PathUtils.h"
 #include "utils/file/FileUtils.h"
+#include "utils/file/AssetManager.h"
 #include "utils/Environment.h"
 #include "utils/FileMutex.h"
 #include "FlowController.h"
@@ -397,11 +398,13 @@ int main(int argc, char **argv) {
           .sensitive_values_encryptor = 
utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome)
       }, nifi_configuration_class_name);
 
-    std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{prov_repo, flow_repo, content_repo};
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configure, 
repo_metric_sources, flow_configuration);
+    auto asset_manager = 
std::make_unique<utils::file::AssetManager>(*configure);
 
+    std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{prov_repo, flow_repo, content_repo};
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configure, 
repo_metric_sources, flow_configuration, asset_manager.get());
     const auto controller = std::make_unique<minifi::FlowController>(
-        prov_repo, flow_repo, configure, std::move(flow_configuration), 
content_repo, std::move(metrics_publisher_store), filesystem, request_restart);
+        prov_repo, flow_repo, configure, std::move(flow_configuration), 
content_repo,
+        std::move(metrics_publisher_store), filesystem, request_restart, 
asset_manager.get());
 
     const bool disk_space_watchdog_enable = 
configure->get(minifi::Configure::minifi_disk_space_watchdog_enable)
         | utils::andThen(utils::string::toBool)

Reply via email to