http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/MetricsBase.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h 
b/libminifi/include/core/state/nodes/MetricsBase.h
new file mode 100644
index 0000000..323b6cd
--- /dev/null
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -0,0 +1,264 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_
+#define LIBMINIFI_INCLUDE_METRICS_METRICSBASE_H_
+
+#include <vector>
+#include <memory>
+#include <string>
+#include "core/Core.h"
+#include "core/Connectable.h"
+#include "core/state/Value.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace response {
+
+
+
+/**
+ * Purpose: Defines a metric. Serialization is intended to be thread safe.
+ */
+class ResponseNode : public core::Connectable {
+ public:
+  ResponseNode()
+      : core::Connectable("metric", 0),
+        is_array_(false) {
+  }
+
+  ResponseNode(std::string name, uuid_t uuid)
+      : core::Connectable(name, uuid),
+        is_array_(false) {
+  }
+  virtual ~ResponseNode() {
+
+  }
+  virtual std::string getName() const = 0;
+
+  virtual std::vector<SerializedResponseNode> serialize() = 0;
+
+  virtual void yield() {
+  }
+  virtual bool isRunning() {
+    return true;
+  }
+  virtual bool isWorkAvailable() {
+    return true;
+  }
+
+  bool isArray() {
+    return is_array_;
+  }
+
+  virtual bool isEmpty(){
+    return false;
+  }
+
+ protected:
+
+  bool is_array_;
+
+  void setArray(bool array) {
+    is_array_ = array;
+  }
+
+};
+
+/**
+ * Purpose: Defines a metric that
+ */
+class DeviceInformation : public ResponseNode {
+ public:
+  DeviceInformation(std::string name, uuid_t uuid)
+      : ResponseNode(name, uuid) {
+  }
+};
+
+/**
+ * Purpose: Defines a metric that
+ */
+class ObjectNode : public ResponseNode {
+ public:
+  ObjectNode(std::string name, uuid_t uuid)
+      : ResponseNode(name, uuid) {
+  }
+
+  void add_node(const std::shared_ptr<ResponseNode> &node) {
+    nodes_.push_back(node);
+  }
+
+  virtual std::string getName() const {
+    return Connectable::getName();
+  }
+
+  virtual std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized;
+//    SerializedResponseNode outer_node;
+  //  outer_node.name = getName();
+    for (auto &node : nodes_) {
+      SerializedResponseNode inner_node;
+      inner_node.name = node->getName();
+      for (auto &embed : node->serialize()) {
+        inner_node.children.push_back(std::move(embed));
+      }
+      serialized.push_back(std::move(inner_node));
+    }
+   //serialized.push_back(std::move(outer_node));
+    return serialized;
+  }
+
+  virtual bool isEmpty(){
+    return nodes_.empty();
+  }
+
+ protected:
+  std::vector<std::shared_ptr<ResponseNode>> nodes_;
+
+};
+
+/**
+ * Purpose: Retrieves Metrics from the defined class. The current Metric, 
which is a consumable for any reader of Metrics must have the ability to set 
metrics.
+ *
+ */
+class ResponseNodeSource {
+ public:
+
+  ResponseNodeSource() {
+
+  }
+
+  virtual ~ResponseNodeSource() {
+  }
+
+  /**
+   * Retrieves all metrics from this source.
+   * @param metric_vector -- metrics will be placed in this vector.
+   * @return result of the get operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) = 0;
+
+  virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) = 0;
+
+};
+
+/**
+ * Purpose: Retrieves Metrics from the defined class. The current Metric, 
which is a consumable for any reader of Metrics must have the ability to set 
metrics.
+ *
+ */
+class MetricsNodeSource : public ResponseNodeSource {
+ public:
+
+  MetricsNodeSource() {
+
+  }
+
+  virtual ~MetricsNodeSource() {
+  }
+
+  /**
+   * Retrieves all metrics from this source.
+   * @param metric_vector -- metrics will be placed in this vector.
+   * @return result of the get operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) {
+    return getMetricNodes(metric_vector);
+  }
+
+  virtual int16_t getMetricNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector) = 0;
+
+};
+
+class NodeReporter {
+ public:
+
+  NodeReporter() {
+
+  }
+
+  virtual ~NodeReporter() {
+  }
+
+  /**
+   * Retrieves all root response nodes from this source.
+   * @param metric_vector -- metrics will be placed in this vector.
+   * @return result of the get operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector, uint16_t metricsClass) = 0;
+
+  /**
+   * Retrieves all metrics from this source.
+   * @param metric_vector -- metrics will be placed in this vector.
+   * @return result of the get operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<ResponseNode>> 
&metric_vector, uint16_t metricsClass) = 0;
+
+};
+
+/**
+ * Purpose: Sink interface for all metrics. The current Metric, which is a 
consumable for any reader of Metrics must have the ability to set metrics.
+ *
+ */
+class ResponseNodeSink {
+ public:
+
+  virtual ~ResponseNodeSink() {
+  }
+  /**
+   * Setter for nodes in this sink.
+   * @param metrics metrics to insert into the current sink.
+   * @return result of the set operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+  virtual int16_t setResponseNodes(const std::shared_ptr<ResponseNode> 
&metrics) = 0;
+
+  /**
+   * Setter for metrics nodes in this sink.
+   * @param metrics metrics to insert into the current sink.
+   * @return result of the set operation.
+   *  0 Success
+   *  1 No error condition, but cannot obtain lock in timely manner.
+   *  -1 failure
+   */
+//  virtual int16_t setMetricsNodes(const std::shared_ptr<ResponseNode> 
&metrics) = 0;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_METRICS_METRICSBASE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/ProcessMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/ProcessMetrics.h 
b/libminifi/include/core/state/nodes/ProcessMetrics.h
new file mode 100644
index 0000000..ebef0d1
--- /dev/null
+++ b/libminifi/include/core/state/nodes/ProcessMetrics.h
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_PROCMETRICS_H_
+
+#include "core/Resource.h"
+#include <sstream>
+#include <map>
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#include "../nodes/DeviceInformation.h"
+#include "../nodes/MetricsBase.h"
+#include "Connection.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace response {
+
+/**
+ * Justification and Purpose: Provides Connection queue metrics. Provides 
critical information to the
+ * C2 server.
+ *
+ */
+class ProcessMetrics : public ResponseNode {
+ public:
+
+  ProcessMetrics(const std::string &name, uuid_t uuid)
+      : ResponseNode(name, uuid) {
+  }
+
+  ProcessMetrics(const std::string &name)
+      : ResponseNode(name, 0) {
+  }
+
+  ProcessMetrics() {
+  }
+
+  virtual std::string getName() const {
+    return "ProcessMetrics";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized;
+
+    struct rusage my_usage;
+    getrusage(RUSAGE_SELF, &my_usage);
+
+    SerializedResponseNode memory;
+    memory.name = "MemoryMetrics";
+
+    SerializedResponseNode maxrss;
+    maxrss.name = "maxrss";
+
+    maxrss.value = (uint64_t)my_usage.ru_maxrss;
+
+    memory.children.push_back(maxrss);
+    serialized.push_back(memory);
+
+    SerializedResponseNode cpu;
+    cpu.name = "CpuMetrics";
+    SerializedResponseNode ics;
+    ics.name = "involcs";
+
+    ics.value = (uint64_t)my_usage.ru_nivcsw;
+
+    cpu.children.push_back(ics);
+    serialized.push_back(cpu);
+
+    return serialized;
+  }
+
+ protected:
+
+};
+
+REGISTER_RESOURCE(ProcessMetrics);
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/QueueMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/QueueMetrics.h 
b/libminifi/include/core/state/nodes/QueueMetrics.h
new file mode 100644
index 0000000..b4c0b64
--- /dev/null
+++ b/libminifi/include/core/state/nodes/QueueMetrics.h
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_
+
+#include <sstream>
+#include <map>
+
+#include "../nodes/MetricsBase.h"
+#include "Connection.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace response {
+
+/**
+ * Justification and Purpose: Provides Connection queue metrics. Provides 
critical information to the
+ * C2 server.
+ *
+ */
+class QueueMetrics : public ResponseNode {
+ public:
+
+  QueueMetrics(const std::string &name, uuid_t uuid)
+      : ResponseNode(name, uuid) {
+  }
+
+  QueueMetrics(const std::string &name)
+      : ResponseNode(name, 0) {
+  }
+
+  QueueMetrics()
+      : ResponseNode("QueueMetrics", 0) {
+  }
+
+  virtual std::string getName() const{
+    return "QueueMetrics";
+  }
+
+  void addConnection(const std::shared_ptr<minifi::Connection> &connection) {
+    if (nullptr != connection) {
+      connections.insert(std::make_pair(connection->getName(), connection));
+    }
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized;
+    for (auto conn : connections) {
+      auto connection = conn.second;
+      SerializedResponseNode parent;
+      parent.name = connection->getName();
+      SerializedResponseNode datasize;
+      datasize.name = "datasize";
+      datasize.value = std::to_string(connection->getQueueDataSize());
+
+      SerializedResponseNode datasizemax;
+      datasizemax.name = "datasizemax";
+      datasizemax.value = std::to_string(connection->getMaxQueueDataSize());
+
+      SerializedResponseNode queuesize;
+      queuesize.name = "queued";
+      queuesize.value = std::to_string(connection->getQueueSize());
+
+      SerializedResponseNode queuesizemax;
+      queuesizemax.name = "queuedmax";
+      queuesizemax.value = std::to_string(connection->getMaxQueueSize());
+
+      parent.children.push_back(datasize);
+      parent.children.push_back(datasizemax);
+      parent.children.push_back(queuesize);
+      parent.children.push_back(queuesizemax);
+
+      serialized.push_back(parent);
+    }
+    return serialized;
+  }
+
+ protected:
+  std::map<std::string, std::shared_ptr<minifi::Connection>> connections;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_NODES_QUEUEMETRICS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/RepositoryMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h 
b/libminifi/include/core/state/nodes/RepositoryMetrics.h
new file mode 100644
index 0000000..5c2db2a
--- /dev/null
+++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_REPOSITORYMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_REPOSITORYMETRICS_H_
+
+#include <sstream>
+#include <map>
+
+#include "../nodes/MetricsBase.h"
+#include "Connection.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace response {
+
+/**
+ * Justification and Purpose: Provides repository metrics. Provides critical 
information to the
+ * C2 server.
+ *
+ */
+class RepositoryMetrics : public ResponseNode {
+ public:
+
+  RepositoryMetrics(const std::string &name, uuid_t uuid)
+      : ResponseNode(name, uuid) {
+  }
+
+  RepositoryMetrics(const std::string &name)
+      : ResponseNode(name, 0) {
+  }
+
+  RepositoryMetrics()
+      : ResponseNode("RepositoryMetrics", 0) {
+  }
+
+  virtual std::string getName() const {
+    return "RepositoryMetrics";
+  }
+
+  void addRepository(const std::shared_ptr<core::Repository> &repo) {
+    if (nullptr != repo) {
+      repositories.insert(std::make_pair(repo->getName(), repo));
+    }
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized;
+    for (auto conn : repositories) {
+      auto repo = conn.second;
+      SerializedResponseNode parent;
+      parent.name = repo->getName();
+      SerializedResponseNode datasize;
+      datasize.name = "running";
+      datasize.value = repo->isRunning();
+
+      SerializedResponseNode datasizemax;
+      datasizemax.name = "full";
+      datasizemax.value = repo->isFull();
+
+      SerializedResponseNode queuesize;
+      queuesize.name = "size";
+      queuesize.value = std::to_string(repo->getRepoSize());
+
+      parent.children.push_back(datasize);
+      parent.children.push_back(datasizemax);
+      parent.children.push_back(queuesize);
+
+      serialized.push_back(parent);
+    }
+    return serialized;
+  }
+
+ protected:
+  std::map<std::string, std::shared_ptr<core::Repository>> repositories;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_RepositoryMetrics_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/StateMonitor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/StateMonitor.h 
b/libminifi/include/core/state/nodes/StateMonitor.h
new file mode 100644
index 0000000..1bf3e23
--- /dev/null
+++ b/libminifi/include/core/state/nodes/StateMonitor.h
@@ -0,0 +1,87 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_
+
+#include "core/Resource.h"
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <functional>
+#include <sys/ioctl.h>
+#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) 
+#include <net/if_dl.h>
+#include <net/if_types.h>
+#endif
+#include <ifaddrs.h>
+#include <net/if.h> 
+#include <unistd.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <ifaddrs.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sstream>
+#include <map>
+#include "../nodes/MetricsBase.h"
+#include "Connection.h"
+#include "io/ClientSocket.h"
+#include "agent/agent_version.h"
+#include "agent/build_description.h"
+#include "core/ClassLoader.h"
+#include "core/state/UpdateController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace response {
+
+class StateMonitorNode : public DeviceInformation {
+ public:
+
+  StateMonitorNode(std::string name, uuid_t uuid)
+      : DeviceInformation(name, uuid),
+        monitor_(nullptr) {
+
+  }
+
+  StateMonitorNode(const std::string &name)
+      : DeviceInformation(name, 0),
+        monitor_(nullptr) {
+  }
+
+  void setStateMonitor(const std::shared_ptr<state::StateMonitor> &monitor) {
+    monitor_ = monitor;
+  }
+ protected:
+  std::shared_ptr<state::StateMonitor> monitor_;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_NODES_STATEMONITOR_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/SystemMetrics.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/SystemMetrics.h 
b/libminifi/include/core/state/nodes/SystemMetrics.h
new file mode 100644
index 0000000..a7a3c9c
--- /dev/null
+++ b/libminifi/include/core/state/nodes/SystemMetrics.h
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_METRICS_SYSMETRICS_H_
+
+#include "core/Resource.h"
+#include <sstream>
+#include <map>
+#ifndef _WIN32
+#include <sys/utsname.h>
+#endif
+#include "../nodes/MetricsBase.h"
+#include "Connection.h"
+#include "../nodes/DeviceInformation.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace response {
+
+/**
+ * Justification and Purpose: Provides system information, including critical 
device information.
+ *
+ */
+class SystemInformation : public DeviceInformation {
+ public:
+
+  SystemInformation(const std::string &name, uuid_t uuid)
+      : DeviceInformation(name, uuid) {
+  }
+
+  SystemInformation(const std::string &name)
+      : DeviceInformation(name, 0) {
+  }
+
+  SystemInformation()
+      : DeviceInformation("systeminfo", 0) {
+  }
+
+  virtual std::string getName() const {
+    return "systeminfo";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized;
+    SerializedResponseNode identifier;
+    identifier.name = "identifier";
+    identifier.value = "identifier";
+
+    SerializedResponseNode systemInfo;
+    systemInfo.name = "systemInfo";
+
+    SerializedResponseNode vcores;
+    vcores.name = "vCores";
+    size_t ncpus = std::thread::hardware_concurrency();
+
+    vcores.value = (uint32_t)ncpus;
+
+    systemInfo.children.push_back(vcores);
+
+    SerializedResponseNode mem;
+    mem.name = "physicalMem";
+#if defined(_SC_PHYS_PAGES) && defined(_SC_PAGESIZE)
+    size_t mema = (size_t) sysconf( _SC_PHYS_PAGES) * (size_t) sysconf( 
_SC_PAGESIZE);
+#endif
+    mem.value = (uint32_t)mema;
+
+    systemInfo.children.push_back(mem);
+
+    SerializedResponseNode arch;
+    arch.name = "machinearch";
+
+    utsname buf;
+
+    if (uname(&buf) == -1) {
+      arch.value = "unknown";
+    } else {
+      arch.value = std::string(buf.machine);
+    }
+
+    systemInfo.children.push_back(arch);
+
+    serialized.push_back(identifier);
+    serialized.push_back(systemInfo);
+
+    return serialized;
+  }
+
+ protected:
+
+};
+
+REGISTER_RESOURCE(SystemInformation);
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_METRICS_QUEUEMETRICS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/state/nodes/TreeUpdateListener.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/TreeUpdateListener.h 
b/libminifi/include/core/state/nodes/TreeUpdateListener.h
new file mode 100644
index 0000000..4b7e237
--- /dev/null
+++ b/libminifi/include/core/state/nodes/TreeUpdateListener.h
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_METRICS_H_
+#define LIBMINIFI_INCLUDE_C2_METRICS_H_
+
+#include <vector>
+
+#include "../nodes/MetricsBase.h"
+#include "core/state/UpdateController.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+namespace response {
+
+/**
+ * Purpose: Class that will represent the metrics updates, which can be 
performed asynchronously.
+ */
+class MetricsUpdate : public Update {
+ public:
+  MetricsUpdate(UpdateStatus status)
+      : Update(status) {
+
+  }
+  virtual bool validate() {
+    return true;
+  }
+};
+
+class OperationWatcher : public utils::AfterExecute<Update> {
+ public:
+  explicit OperationWatcher(std::atomic<bool> *running)
+      : running_(running) {
+  }
+
+  explicit OperationWatcher(OperationWatcher && other)
+      : running_(std::move(other.running_)) {
+
+  }
+
+  ~OperationWatcher() {
+
+  }
+
+  virtual bool isFinished(const Update &result) {
+    if (result.getStatus().getState() == UpdateState::READ_COMPLETE && 
running_) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+  virtual bool isCancelled(const Update &result) {
+    return false;
+  }
+
+ protected:
+  std::atomic<bool> *running_;
+
+};
+
+class TreeUpdateListener {
+ public:
+  TreeUpdateListener(const std::shared_ptr<response::NodeReporter> &source, 
const std::shared_ptr<response::ResponseNodeSink> &sink)
+      : running_(true),
+        source_(source),
+        sink_(sink){
+
+    function_ = [&]() {
+      while(running_) {
+        std::vector<std::shared_ptr<response::ResponseNode>> metric_vector;
+        // simple pass through for the metrics
+        if (nullptr != source_ && nullptr != sink_) {
+          source_->getResponseNodes(metric_vector,0);
+          for(auto metric : metric_vector) {
+            sink_->setResponseNodes(metric);
+          }
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+      }
+      return MetricsUpdate(UpdateState::READ_COMPLETE);
+    };
+  }
+
+  void stop() {
+    running_ = false;
+  }
+
+  std::function<Update()> &getFunction() {
+    return function_;
+  }
+
+  std::future<Update> &getFuture() {
+    return future_;
+  }
+
+ private:
+  std::function<Update()> function_;
+  std::future<Update> future_;
+  std::atomic<bool> running_;
+  std::shared_ptr<response::NodeReporter> source_;
+  std::shared_ptr<response::ResponseNodeSink> sink_;
+};
+
+} /* namespace metrics */
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_C2_METRICS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/processors/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetFile.h 
b/libminifi/include/processors/GetFile.h
index 07d3939..a9918c6 100644
--- a/libminifi/include/processors/GetFile.h
+++ b/libminifi/include/processors/GetFile.h
@@ -19,13 +19,14 @@
 #define __GET_FILE_H__
 
 #include <atomic>
+
+#include "../core/state/nodes/MetricsBase.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "core/state/metrics/MetricsBase.h"
 
 namespace org {
 namespace apache {
@@ -45,17 +46,17 @@ struct GetFileRequest {
   std::string fileFilter = "[^\\.].*";
 };
 
-class GetFileMetrics : public state::metrics::Metrics {
+class GetFileMetrics : public state::response::ResponseNode {
  public:
   GetFileMetrics()
-      : state::metrics::Metrics("GetFileMetrics", 0) {
+      : state::response::ResponseNode("GetFileMetrics", 0) {
     iterations_ = 0;
     accepted_files_ = 0;
     input_bytes_ = 0;
   }
 
   GetFileMetrics(std::string name, uuid_t uuid)
-      : state::metrics::Metrics(name, uuid) {
+      : state::response::ResponseNode(name, uuid) {
     iterations_ = 0;
     accepted_files_ = 0;
     input_bytes_ = 0;
@@ -67,24 +68,24 @@ class GetFileMetrics : public state::metrics::Metrics {
     return core::Connectable::getName();
   }
 
-  virtual std::vector<state::metrics::MetricResponse> serialize() {
-    std::vector<state::metrics::MetricResponse> resp;
+  virtual std::vector<state::response::SerializedResponseNode> serialize() {
+    std::vector<state::response::SerializedResponseNode> resp;
 
-    state::metrics::MetricResponse iter;
+    state::response::SerializedResponseNode iter;
     iter.name = "OnTriggerInvocations";
-    iter.value = std::to_string(iterations_.load());
+    iter.value = (uint32_t)iterations_.load();
 
     resp.push_back(iter);
 
-    state::metrics::MetricResponse accepted_files;
+    state::response::SerializedResponseNode accepted_files;
     accepted_files.name = "AcceptedFiles";
-    accepted_files.value = std::to_string(accepted_files_.load());
+    accepted_files.value = (uint32_t)accepted_files_.load();
 
     resp.push_back(accepted_files);
 
-    state::metrics::MetricResponse input_bytes;
+    state::response::SerializedResponseNode input_bytes;
     input_bytes.name = "InputBytes";
-    input_bytes.value = std::to_string(input_bytes_.load());
+    input_bytes.value = (uint32_t)input_bytes_.load();
 
     resp.push_back(input_bytes);
 
@@ -101,7 +102,7 @@ class GetFileMetrics : public state::metrics::Metrics {
 };
 
 // GetFile Class
-class GetFile : public core::Processor, public state::metrics::MetricsSource {
+class GetFile : public core::Processor, public 
state::response::MetricsNodeSource {
  public:
   // Constructor
   /*!
@@ -156,7 +157,7 @@ class GetFile : public core::Processor, public 
state::metrics::MetricsSource {
    */
   void performListing(std::string dir, const GetFileRequest &request);
 
-  int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> 
&metric_vector);
+  int16_t 
getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> 
&metric_vector);
 
  protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/processors/GetTCP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetTCP.h 
b/libminifi/include/processors/GetTCP.h
index 4d331a1..741d190 100644
--- a/libminifi/include/processors/GetTCP.h
+++ b/libminifi/include/processors/GetTCP.h
@@ -19,6 +19,8 @@
 #define __GET_TCP_H__
 
 #include <atomic>
+
+#include "../core/state/nodes/MetricsBase.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -27,7 +29,6 @@
 #include "concurrentqueue.h"
 #include "utils/ThreadPool.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "core/state/metrics/MetricsBase.h"
 #include "controllers/SSLContextService.h"
 
 namespace org {
@@ -115,17 +116,17 @@ class DataHandler {
 
 };
 
-class GetTCPMetrics : public state::metrics::Metrics {
+class GetTCPMetrics : public state::response::ResponseNode {
  public:
   GetTCPMetrics()
-      : state::metrics::Metrics("GetTCPMetrics", 0) {
+      : state::response::ResponseNode("GetTCPMetrics", 0) {
     iterations_ = 0;
     accepted_files_ = 0;
     input_bytes_ = 0;
   }
 
   GetTCPMetrics(std::string name, uuid_t uuid)
-      : state::metrics::Metrics(name, uuid) {
+      : state::response::ResponseNode(name, uuid) {
     iterations_ = 0;
     accepted_files_ = 0;
     input_bytes_ = 0;
@@ -137,24 +138,24 @@ class GetTCPMetrics : public state::metrics::Metrics {
     return core::Connectable::getName();
   }
 
-  virtual std::vector<state::metrics::MetricResponse> serialize() {
-    std::vector<state::metrics::MetricResponse> resp;
+  virtual std::vector<state::response::SerializedResponseNode> serialize() {
+    std::vector<state::response::SerializedResponseNode> resp;
 
-    state::metrics::MetricResponse iter;
+    state::response::SerializedResponseNode iter;
     iter.name = "OnTriggerInvocations";
-    iter.value = std::to_string(iterations_.load());
+    iter.value = (uint32_t)iterations_.load();
 
     resp.push_back(iter);
 
-    state::metrics::MetricResponse accepted_files;
+    state::response::SerializedResponseNode accepted_files;
     accepted_files.name = "AcceptedFiles";
-    accepted_files.value = std::to_string(accepted_files_.load());
+    accepted_files.value = (uint32_t)accepted_files_.load();
 
     resp.push_back(accepted_files);
 
-    state::metrics::MetricResponse input_bytes;
+    state::response::SerializedResponseNode input_bytes;
     input_bytes.name = "InputBytes";
-    input_bytes.value = std::to_string(input_bytes_.load());
+    input_bytes.value = (uint32_t)input_bytes_.load();
 
     resp.push_back(input_bytes);
 
@@ -171,7 +172,7 @@ class GetTCPMetrics : public state::metrics::Metrics {
 };
 
 // GetTCP Class
-class GetTCP : public core::Processor, public state::metrics::MetricsSource {
+class GetTCP : public core::Processor, public 
state::response::MetricsNodeSource {
  public:
 // Constructor
   /*!
@@ -236,7 +237,7 @@ class GetTCP : public core::Processor, public 
state::metrics::MetricsSource {
   // Initialize, over write by NiFi GetTCP
   virtual void initialize(void);
 
-  int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> 
&metric_vector);
+  int16_t 
getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> 
&metric_vector);
 
  protected:
 
@@ -287,6 +288,7 @@ class GetTCP : public core::Processor, public 
state::metrics::MetricsSource {
 
 REGISTER_RESOURCE(GetTCP);
 
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index 1e17d53..66a9351 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -33,6 +33,7 @@ class Configure : public Properties {
   static const char *nifi_default_directory;
   static const char *nifi_c2_enable;
   static const char *nifi_flow_configuration_file;
+  static const char *nifi_flow_configuration_file_backup_update;
   static const char *nifi_flow_engine_threads;
   static const char *nifi_administrative_yield_duration;
   static const char *nifi_bored_yield_duration;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/ByteArrayCallback.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ByteArrayCallback.h 
b/libminifi/include/utils/ByteArrayCallback.h
index 9ccca48..653159c 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -109,11 +109,11 @@ class ByteOutputCallback : public OutputStreamCallback {
 
   virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
 
-  const std::vector<char> to_string();
+  virtual const std::vector<char> to_string();
 
-  void close();
+  virtual void close();
 
-  size_t getSize();
+  virtual size_t getSize();
 
   bool waitingOps();
 
@@ -121,7 +121,7 @@ class ByteOutputCallback : public OutputStreamCallback {
 
   size_t readFully(char *buffer, size_t size);
 
- private:
+ protected:
 
   inline void write_and_notify(char *data, size_t size);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/FileOutputCallback.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/FileOutputCallback.h 
b/libminifi/include/utils/FileOutputCallback.h
new file mode 100644
index 0000000..df2637b
--- /dev/null
+++ b/libminifi/include/utils/FileOutputCallback.h
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_FILEOUTPUTCALLBACK_H_
+#define LIBMINIFI_INCLUDE_UTILS_FILEOUTPUTCALLBACK_H_
+
+#include <fstream>
+#include "concurrentqueue.h"
+#include "FlowFileRecord.h"
+#include "ByteArrayCallback.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * General vector based uint8_t callback.
+ *
+ * While calls are thread safe, the class is intended to have
+ * a single consumer.
+ */
+class FileOutputCallback : public ByteOutputCallback {
+ public:
+  FileOutputCallback() = delete;
+
+  explicit FileOutputCallback(std::string file, bool wait_on_read=false)
+      : ByteOutputCallback(INT_MAX), file_(file), file_stream_(file),
+        logger_(logging::LoggerFactory<FileOutputCallback>::getLogger()) {
+  }
+
+  virtual ~FileOutputCallback() {
+
+  }
+
+  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+  virtual const std::vector<char> to_string() override;
+
+  virtual void close() override;
+
+  virtual size_t getSize() override;
+
+  virtual void write(char *data, size_t size) override;
+
+ private:
+
+  std::string file_;
+
+  std::ofstream file_stream_;
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_UTILS_BYTEARRAYCALLBACK_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/HTTPClient.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/HTTPClient.h 
b/libminifi/include/utils/HTTPClient.h
index 69674be..553dcea 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -294,6 +294,7 @@ class BaseHTTPClient {
 extern std::string get_token(utils::BaseHTTPClient *client, std::string 
username, std::string password);
 
 extern void parse_url(std::string *url, std::string *host, int *port, 
std::string *protocol);
+extern void parse_url(std::string *url, std::string *host, int *port, 
std::string *protocol, std::string *path, std::string *query);
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/utils/file/FileUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/FileUtils.h 
b/libminifi/include/utils/file/FileUtils.h
index fd14e3e..796ee1a 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -79,6 +79,15 @@ class FileUtils {
 #endif
     return -1;
   }
+  
+  static int copy_file(const std::string &path_from, const std::string 
dest_path) {
+    std::ifstream src(path_from, std::ios::binary);
+    if (!src.is_open())
+      return -1;
+    std::ofstream dest(dest_path, std::ios::binary);
+    dest << src.rdbuf();
+    return 0;
+  }
 
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index be35d6b..1d97edd 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -25,6 +25,7 @@ namespace minifi {
 const char *Configure::nifi_default_directory = "nifi.default.directory";
 const char *Configure::nifi_c2_enable = "nifi.c2.enable";
 const char *Configure::nifi_flow_configuration_file = 
"nifi.flow.configuration.file";
+const char *Configure::nifi_flow_configuration_file_backup_update = 
"nifi.flow.configuration.backup.on.update";
 const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
 const char *Configure::nifi_administrative_yield_duration = 
"nifi.administrative.yield.duration";
 const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index e810e0c..1414765 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -33,11 +33,15 @@
 #include <utility>
 #include <memory>
 #include <string>
-#include "core/state/metrics/QueueMetrics.h"
-#include "core/state/metrics/DeviceInformation.h"
-#include "core/state/metrics/SystemMetrics.h"
-#include "core/state/metrics/ProcessMetrics.h"
-#include "core/state/metrics/RepositoryMetrics.h"
+
+#include "core/state/nodes/AgentInformation.h"
+#include "core/state/nodes/BuildInformation.h"
+#include "core/state/nodes/DeviceInformation.h"
+#include "core/state/nodes/FlowInformation.h"
+#include "core/state/nodes/ProcessMetrics.h"
+#include "core/state/nodes/QueueMetrics.h"
+#include "core/state/nodes/RepositoryMetrics.h"
+#include "core/state/nodes/SystemMetrics.h"
 #include "core/state/ProcessorController.h"
 #include "yaml-cpp/yaml.h"
 #include "c2/C2Agent.h"
@@ -50,6 +54,7 @@
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Connectable.h"
+#include "utils/HTTPClient.h"
 
 namespace org {
 namespace apache {
@@ -67,6 +72,8 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
       max_timer_driven_threads_(0),
       max_event_driven_threads_(0),
       running_(false),
+      updating_(false),
+      flow_version_(nullptr),
       c2_enabled_(true),
       initialized_(false),
       provenance_repo_(provenance_repo),
@@ -166,13 +173,19 @@ bool FlowController::applyConfiguration(const std::string 
&configurePayload) {
 
   logger_->log_info("Starting to reload Flow Controller with flow control name 
%s, version %d", newRoot->getName(), newRoot->getVersion());
 
+  updating_ = true;
+
   std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   stop(true);
   waitUnload(30000);
   this->root_ = std::move(newRoot);
   loadFlowRepo();
   initialized_ = true;
-  return start();
+  bool started = start();
+
+  updating_ = false;
+
+  return started;
 }
 
 int16_t FlowController::stop(bool force, uint64_t timeToWait) {
@@ -332,53 +345,110 @@ void FlowController::initializeC2() {
   if (!c2_enabled_) {
     return;
   }
-  if (!c2_initialized_) {
-    std::string c2_enable_str;
-
-    if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) {
-      bool enable_c2 = true;
-      utils::StringUtils::StringToBool(c2_enable_str, enable_c2);
-      c2_enabled_ = enable_c2;
-      if (!c2_enabled_) {
-        return;
-      }
-    } else {
-      c2_enabled_ = true;
-    }
-    state::StateManager::initialize();
+  if (c2_initialized_)
+    return;
 
-    std::shared_ptr<c2::C2Agent> agent = 
std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
 std::dynamic_pointer_cast<FlowController>(shared_from_this()),
-                                                                       
configuration_);
-    registerUpdateListener(agent, agent->getHeartBeatDelay());
+  std::string c2_enable_str;
 
-    state::StateManager::startMetrics(agent->getHeartBeatDelay());
-  }
-  if (!c2_enabled_) {
-    return;
+  if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) {
+    bool enable_c2 = true;
+    utils::StringUtils::StringToBool(c2_enable_str, enable_c2);
+    c2_enabled_ = enable_c2;
+    if (!c2_enabled_) {
+      return;
+    }
+  } else {
+    c2_enabled_ = true;
   }
+  state::StateManager::initialize();
+
+  std::shared_ptr<c2::C2Agent> agent = 
std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
 std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+                                                                     
configuration_);
+  registerUpdateListener(agent, agent->getHeartBeatDelay());
+
+  state::StateManager::startMetrics(agent->getHeartBeatDelay());
 
   c2_initialized_ = true;
-  metrics_.clear();
+  flow_version_ = std::make_shared<state::response::FlowVersion>("", 
"default", "");
+  device_information_.clear();
   component_metrics_.clear();
   component_metrics_by_id_.clear();
   std::string class_csv;
 
   if (root_ != nullptr) {
-    std::shared_ptr<state::metrics::QueueMetrics> queueMetrics = 
std::make_shared<state::metrics::QueueMetrics>();
+    std::shared_ptr<state::response::QueueMetrics> queueMetrics = 
std::make_shared<state::response::QueueMetrics>();
 
     std::map<std::string, std::shared_ptr<Connection>> connections;
     root_->getConnections(connections);
     for (auto con : connections) {
       queueMetrics->addConnection(con.second);
     }
-    metrics_[queueMetrics->getName()] = queueMetrics;
+    device_information_[queueMetrics->getName()] = queueMetrics;
 
-    std::shared_ptr<state::metrics::RepositoryMetrics> repoMetrics = 
std::make_shared<state::metrics::RepositoryMetrics>();
+    std::shared_ptr<state::response::RepositoryMetrics> repoMetrics = 
std::make_shared<state::response::RepositoryMetrics>();
 
     repoMetrics->addRepository(provenance_repo_);
     repoMetrics->addRepository(flow_file_repo_);
 
-    metrics_[repoMetrics->getName()] = repoMetrics;
+    device_information_[repoMetrics->getName()] = repoMetrics;
+  }
+
+  if (configuration_->get("nifi.c2.root.classes", class_csv)) {
+    std::vector<std::string> classes = utils::StringUtils::split(class_csv, 
",");
+
+    for (std::string clazz : classes) {
+      auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, 
clazz);
+
+      if (nullptr == ptr) {
+        logger_->log_error("No metric defined for %s", clazz);
+        continue;
+      }
+
+      std::shared_ptr<state::response::ResponseNode> processor = 
std::static_pointer_cast<state::response::ResponseNode>(ptr);
+
+      auto identifier = 
std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor);
+
+      if (identifier != nullptr) {
+        std::string identifier_str;
+        if (configuration_->get("nifi.c2.agent.identifier", identifier_str) && 
!identifier_str.empty()) {
+          identifier->setIdentifier(identifier_str);
+        } else {
+          // set to the flow controller's identifier
+          identifier->setIdentifier(uuidStr_);
+        }
+
+        std::string class_str;
+        if (configuration_->get("nifi.c2.agent.class", class_str) && 
!class_str.empty()) {
+          identifier->setAgentClass(class_str);
+        } else {
+          // set to the flow controller's identifier
+          identifier->setAgentClass("default");
+        }
+      }
+
+      auto monitor = 
std::dynamic_pointer_cast<state::response::AgentMonitor>(processor);
+      if (monitor != nullptr) {
+        monitor->addRepository(provenance_repo_);
+        monitor->addRepository(flow_file_repo_);
+        monitor->setStateMonitor(shared_from_this());
+      }
+
+      auto flowMonitor = 
std::dynamic_pointer_cast<state::response::FlowMonitor>(processor);
+      std::map<std::string, std::shared_ptr<Connection>> connections;
+      root_->getConnections(connections);
+      if (flowMonitor != nullptr) {
+        for (auto con : connections) {
+          flowMonitor->addConnection(con.second);
+        }
+        flowMonitor->setStateMonitor(shared_from_this());
+
+        flowMonitor->setFlowVersion(flow_version_);
+      }
+
+      std::lock_guard<std::mutex> lock(metrics_mutex_);
+
+      root_response_nodes_[processor->getName()] = processor;
+    }
   }
 
   if (configuration_->get("nifi.flow.metrics.classes", class_csv)) {
@@ -392,11 +462,11 @@ void FlowController::initializeC2() {
         continue;
       }
 
-      std::shared_ptr<state::metrics::Metrics> processor = 
std::static_pointer_cast<state::metrics::Metrics>(ptr);
+      std::shared_ptr<state::response::ResponseNode> processor = 
std::static_pointer_cast<state::response::ResponseNode>(ptr);
 
       std::lock_guard<std::mutex> lock(metrics_mutex_);
 
-      metrics_[processor->getName()] = processor;
+      device_information_[processor->getName()] = processor;
     }
   }
 
@@ -406,11 +476,11 @@ void FlowController::initializeC2() {
   if (root_ != nullptr) {
     root_->getAllProcessors(processors);
     for (const auto &processor : processors) {
-      auto rep = 
std::dynamic_pointer_cast<state::metrics::MetricsSource>(processor);
+      auto rep = 
std::dynamic_pointer_cast<state::response::ResponseNodeSource>(processor);
       // we have a metrics source.
       if (nullptr != rep) {
-        std::vector<std::shared_ptr<state::metrics::Metrics>> metric_vector;
-        rep->getMetrics(metric_vector);
+        std::vector<std::shared_ptr<state::response::ResponseNode>> 
metric_vector;
+        rep->getResponseNodes(metric_vector);
         for (auto metric : metric_vector) {
           component_metrics_[metric->getName()] = metric;
         }
@@ -434,7 +504,7 @@ void FlowController::initializeC2() {
             std::lock_guard<std::mutex> lock(metrics_mutex_);
             auto ret = component_metrics_[clazz];
             if (nullptr == ret) {
-              ret = metrics_[clazz];
+              ret = device_information_[clazz];
             }
             if (nullptr == ret) {
               logger_->log_error("No metric defined for %s", clazz);
@@ -448,7 +518,146 @@ void FlowController::initializeC2() {
       }
     }
   }
+
+  loadC2ResponseConfiguration();
+}
+
+void FlowController::loadC2ResponseConfiguration(const std::string &prefix) {
+  std::string class_definitions;
+
+  if (configuration_->get(prefix, class_definitions)) {
+    std::vector<std::string> classes = 
utils::StringUtils::split(class_definitions, ",");
+
+    for (std::string metricsClass : classes) {
+      try {
+        std::stringstream option;
+        option << prefix << "." << metricsClass;
+
+        std::stringstream classOption;
+        classOption << option.str() << ".classes";
+
+        std::stringstream nameOption;
+        nameOption << option.str() << ".name";
+        std::string name;
+
+        if (configuration_->get(nameOption.str(), name)) {
+          std::shared_ptr<state::response::ResponseNode> new_node = 
std::make_shared<state::response::ObjectNode>(name, nullptr);
+
+          if (configuration_->get(classOption.str(), class_definitions)) {
+            std::vector<std::string> classes = 
utils::StringUtils::split(class_definitions, ",");
+
+            for (std::string clazz : classes) {
+              std::lock_guard<std::mutex> lock(metrics_mutex_);
+
+              // instantiate the object
+              auto ptr = 
core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
+
+              if (nullptr == ptr) {
+                auto metric = component_metrics_.find(clazz);
+                if (metric != component_metrics_.end()) {
+                  ptr = metric->second;
+                } else {
+                  logger_->log_error("No metric defined for %s", clazz);
+                  continue;
+                }
+              }
+
+              auto node = 
std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
+
+              
std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
+            }
+
+          } else {
+            std::stringstream optionName;
+            optionName << option.str() << "." << name;
+            auto node = loadC2ResponseConfiguration(optionName.str(), 
new_node);
+//            if (node != nullptr && new_node != node)
+            //            
std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
+          }
+
+          root_response_nodes_[name] = new_node;
+        }
+      } catch (...) {
+        logger_->log_error("Could not create metrics class %s", metricsClass);
+      }
+    }
+  }
 }
+
+std::shared_ptr<state::response::ResponseNode> 
FlowController::loadC2ResponseConfiguration(const std::string &prefix, 
std::shared_ptr<state::response::ResponseNode> prev_node) {
+  std::string class_definitions;
+  if (configuration_->get(prefix, class_definitions)) {
+    std::vector<std::string> classes = 
utils::StringUtils::split(class_definitions, ",");
+
+    for (std::string metricsClass : classes) {
+      try {
+        std::stringstream option;
+        option << prefix << "." << metricsClass;
+
+        std::stringstream classOption;
+        classOption << option.str() << ".classes";
+
+        std::stringstream nameOption;
+        nameOption << option.str() << ".name";
+        std::string name;
+
+        if (configuration_->get(nameOption.str(), name)) {
+          std::shared_ptr<state::response::ResponseNode> new_node = 
std::make_shared<state::response::ObjectNode>(name, nullptr);
+          if (name.find(",") != std::string::npos) {
+            std::vector<std::string> sub_classes = 
utils::StringUtils::split(name, ",");
+            for (std::string subClassStr : classes) {
+              auto node = loadC2ResponseConfiguration(subClassStr, prev_node);
+              if (node != nullptr)
+                
std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(node);
+            }
+
+          } else {
+            if (configuration_->get(classOption.str(), class_definitions)) {
+              std::vector<std::string> classes = 
utils::StringUtils::split(class_definitions, ",");
+
+              for (std::string clazz : classes) {
+                std::lock_guard<std::mutex> lock(metrics_mutex_);
+
+                // instantiate the object
+                auto ptr = 
core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
+
+                if (nullptr == ptr) {
+                  auto metric = component_metrics_.find(clazz);
+                  if (metric != component_metrics_.end()) {
+                    ptr = metric->second;
+                  } else {
+                    logger_->log_error("No metric defined for %s", clazz);
+                    continue;
+                  }
+                }
+
+                auto node = 
std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
+
+                
std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
+              }
+              if (!new_node->isEmpty())
+                
std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(new_node);
+
+            } else {
+              std::stringstream optionName;
+              optionName << option.str() << "." << name;
+              auto sub_node = loadC2ResponseConfiguration(optionName.str(), 
new_node);
+              
std::static_pointer_cast<state::response::ObjectNode>(prev_node)->add_node(sub_node);
+            }
+          }
+        }
+      } catch (...) {
+        logger_->log_error("Could not create metrics class %s", metricsClass);
+      }
+    }
+  }
+  return prev_node;
+}
+
+void FlowController::loadC2ResponseConfiguration() {
+  loadC2ResponseConfiguration("nifi.c2.root.class.definitions");
+}
+
 /**
  * Controller Service functions
  *
@@ -595,9 +804,37 @@ void FlowController::enableAllControllerServices() {
   controller_service_provider_->enableAllControllerServices();
 }
 
-int16_t FlowController::applyUpdate(const std::string &configuration) {
-  applyConfiguration(configuration);
-  return 0;
+int16_t FlowController::applyUpdate(const std::string &source, const 
std::string &configuration) {
+  if (!source.empty()) {
+    std::string host, protocol, path, query, url = source;
+    int port;
+    utils::parse_url(&url, &host, &port, &protocol, &path, &query);
+
+    std::string flow_id, bucket_id;
+    auto path_split = utils::StringUtils::split(path, "/");
+    for (size_t i = 0; i < path_split.size(); i++) {
+      const std::string &str = path_split.at(i);
+      if (str == "flows") {
+        if (i + 1 < path_split.size()) {
+          flow_id = path_split.at(i + 1);
+          i++;
+        }
+      }
+
+      if (str == "bucket") {
+        if (i + 1 < path_split.size()) {
+          bucket_id = path_split.at(i + 1);
+          i++;
+        }
+      }
+    }
+    flow_version_->setFlowVersion(url, bucket_id, flow_id);
+  }
+  if (applyConfiguration(configuration)) {
+    return 1;
+  } else {
+    return 0;
+  }
 }
 
 int16_t FlowController::clearConnection(const std::string &connection) {
@@ -614,10 +851,20 @@ int16_t FlowController::clearConnection(const std::string 
&connection) {
   return -1;
 }
 
-int16_t 
FlowController::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>>
 &metric_vector, uint16_t metricsClass) {
+int16_t 
FlowController::getResponseNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
 &metric_vector, uint16_t metricsClass) {
+  std::lock_guard<std::mutex> lock(metrics_mutex_);
+
+  for (auto metric : root_response_nodes_) {
+    metric_vector.push_back(metric.second);
+  }
+
+  return 0;
+}
+
+int16_t 
FlowController::getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
 &metric_vector, uint16_t metricsClass) {
   std::lock_guard<std::mutex> lock(metrics_mutex_);
   if (metricsClass == 0) {
-    for (auto metric : metrics_) {
+    for (auto metric : device_information_) {
       metric_vector.push_back(metric.second);
     }
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index a8cc5b2..077aefe 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -8,7 +8,7 @@
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *repo
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,7 +28,8 @@
 #include "core/state/UpdateController.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
-
+#include "utils/file/FileUtils.h"
+#include "utils/file/FileManager.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -43,6 +44,7 @@ C2Agent::C2Agent(const 
std::shared_ptr<core::controller::ControllerServiceProvid
       heart_beat_period_(3000),
       max_c2_responses(5),
       logger_(logging::LoggerFactory<C2Agent>::getLogger()) {
+  allow_updates_ = true;
 
   running_configuration = std::make_shared<Configure>();
 
@@ -107,6 +109,10 @@ void C2Agent::configure(const std::shared_ptr<Configure> 
&configure, bool reconf
 
     if (protocol == nullptr) {
       protocol = 
core::ClassLoader::getDefaultClassLoader().instantiateRaw("RESTSender", 
"RESTSender");
+
+      if (!protocol) {
+        return;
+      }
       logger_->log_info("Class is RESTSender");
     }
     C2Protocol *old_protocol = 
protocol_.exchange(dynamic_cast<C2Protocol*>(protocol));
@@ -131,6 +137,31 @@ void C2Agent::configure(const std::shared_ptr<Configure> 
&configure, bool reconf
       heart_beat_period_ = 3000;
   }
 
+  std::string update_settings;
+  if (configure->get("c2.agent.update.allow", update_settings) && 
utils::StringUtils::StringToBool(update_settings, allow_updates_)) {
+    // allow the agent to be updated. we then need to get an update command to 
execute after
+  }
+
+  if (allow_updates_) {
+    if (!configure->get("c2.agent.update.command", update_command_)) {
+      char cwd[1024];
+      getcwd(cwd, sizeof(cwd));
+
+      std::stringstream command;
+      command << cwd << "/minifi.sh update";
+      update_command_ = command.str();
+    }
+
+    if (!configure->get("c2.agent.update.temp.location", update_location_)) {
+      char cwd[1024];
+      getcwd(cwd, sizeof(cwd));
+
+      std::stringstream copy_path;
+      std::stringstream command;
+
+      copy_path << cwd << "/minifi.update";
+    }
+  }
   std::string heartbeat_reporters;
   if (configure->get("c2.agent.heartbeat.reporter.classes", 
heartbeat_reporters)) {
     std::vector<std::string> reporters = 
utils::StringUtils::split(heartbeat_reporters, ",");
@@ -163,7 +194,7 @@ void C2Agent::performHeartBeat() {
 
   logger_->log_trace("Performing heartbeat");
 
-  std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_copy;
+  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> 
metrics_copy;
   {
     std::lock_guard<std::timed_mutex> lock(metrics_mutex_);
     if (metrics_map_.size() > 0) {
@@ -180,7 +211,7 @@ void C2Agent::performHeartBeat() {
         continue;
       C2Payload child_metric_payload(Operation::HEARTBEAT);
       child_metric_payload.setLabel(metric.first);
-      serializeMetrics(child_metric_payload, metric.first, 
metric.second->serialize());
+      serializeMetrics(child_metric_payload, metric.first, 
metric.second->serialize(), metric.second->isArray());
       metrics.addPayload(std::move(child_metric_payload));
     }
     payload.addPayload(std::move(metrics));
@@ -188,44 +219,31 @@ void C2Agent::performHeartBeat() {
 
   if (device_information_.size() > 0) {
     C2Payload deviceInfo(Operation::HEARTBEAT);
-    deviceInfo.setLabel("DeviceInfo");
+    deviceInfo.setLabel("AgentInformation");
 
     for (auto metric : device_information_) {
       C2Payload child_metric_payload(Operation::HEARTBEAT);
       child_metric_payload.setLabel(metric.first);
-      serializeMetrics(child_metric_payload, metric.first, 
metric.second->serialize());
+      if (metric.second->isArray()) {
+        child_metric_payload.setContainer(true);
+      }
+      serializeMetrics(child_metric_payload, metric.first, 
metric.second->serialize(), metric.second->isArray());
       deviceInfo.addPayload(std::move(child_metric_payload));
     }
     payload.addPayload(std::move(deviceInfo));
   }
 
-  std::vector<std::shared_ptr<state::StateController>> components = 
update_sink_->getAllComponents();
-
-  if (!components.empty()) {
-    C2ContentResponse component_payload(Operation::HEARTBEAT);
-    component_payload.name = "Components";
-
-    for (auto &component : components) {
-      if (component->isRunning()) {
-        component_payload.operation_arguments[component->getComponentName()] = 
"enabled";
-      } else {
-        component_payload.operation_arguments[component->getComponentName()] = 
"disabled";
+  if (!root_response_nodes_.empty()) {
+    for (auto metric : root_response_nodes_) {
+      C2Payload child_metric_payload(Operation::HEARTBEAT);
+      child_metric_payload.setLabel(metric.first);
+      if (metric.second->isArray()) {
+        child_metric_payload.setContainer(true);
       }
+      serializeMetrics(child_metric_payload, metric.first, 
metric.second->serialize(), metric.second->isArray());
+      payload.addPayload(std::move(child_metric_payload));
     }
-    payload.addContent(std::move(component_payload));
-  }
-
-  C2ContentResponse state(Operation::HEARTBEAT);
-  state.name = "state";
-  if (update_sink_->isRunning()) {
-    state.operation_arguments["running"] = "true";
-  } else {
-    state.operation_arguments["running"] = "false";
   }
-  state.operation_arguments["uptime"] = 
std::to_string(update_sink_->getUptime());
-
-  payload.addContent(std::move(state));
-
   C2Payload && response = protocol_.load()->consumePayload(payload);
 
   enqueue_c2_server_response(std::move(response));
@@ -237,20 +255,17 @@ void C2Agent::performHeartBeat() {
   }
 }
 
-void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string 
&name, const std::vector<state::metrics::MetricResponse> &metrics) {
+void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string 
&name, const std::vector<state::response::SerializedResponseNode> &metrics, 
bool is_container) {
   for (auto metric : metrics) {
     if (metric.children.size() > 0) {
       C2Payload child_metric_payload(metric_payload.getOperation());
       child_metric_payload.setLabel(metric.name);
-      serializeMetrics(child_metric_payload, metric.name, metric.children);
-
+      serializeMetrics(child_metric_payload, metric.name, metric.children, 
is_container);
       metric_payload.addPayload(std::move(child_metric_payload));
     } else {
       C2ContentResponse response(metric_payload.getOperation());
       response.name = name;
-
       response.operation_arguments[metric.name] = metric.value;
-
       metric_payload.addContent(std::move(response));
     }
   }
@@ -323,9 +338,9 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse &resp) {
     case Operation::CLEAR:
       // we've been told to clear something
       if (resp.name == "connection") {
-        logger_->log_debug("Clearing connection %s", resp.name);
         for (auto connection : resp.operation_arguments) {
-          update_sink_->clearConnection(connection.second);
+          logger_->log_debug("Clearing connection %s", 
connection.second.to_string());
+          update_sink_->clearConnection(connection.second.to_string());
         }
         C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
@@ -333,7 +348,6 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse &resp) {
         update_sink_->drainRepositories();
         C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
-
       } else {
         logger_->log_debug("Clearing unknown %s", resp.name);
       }
@@ -392,7 +406,7 @@ void C2Agent::handle_c2_server_response(const 
C2ContentResponse &resp) {
  */
 void C2Agent::handle_describe(const C2ContentResponse &resp) {
   if (resp.name == "metrics") {
-    auto reporter = 
std::dynamic_pointer_cast<state::metrics::MetricsReporter>(update_sink_);
+    auto reporter = 
std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
 
     if (reporter != nullptr) {
       auto metricsClass = resp.operation_arguments.find("metricsClass");
@@ -400,15 +414,15 @@ void C2Agent::handle_describe(const C2ContentResponse 
&resp) {
       if (metricsClass != resp.operation_arguments.end()) {
         // we have a class
         try {
-          metric_class_id = std::stoi(metricsClass->second);
+          metric_class_id = std::stoi(metricsClass->second.to_string());
         } catch (...) {
-          logger_->log_error("Could not convert %s into an integer", 
metricsClass->second);
+          logger_->log_error("Could not convert %s into an integer", 
metricsClass->second.to_string());
         }
       }
 
-      std::vector<std::shared_ptr<state::metrics::Metrics>> metrics_vec;
+      std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
 
-      reporter->getMetrics(metrics_vec, metric_class_id);
+      reporter->getResponseNodes(metrics_vec, metric_class_id);
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
       response.setLabel("metrics");
       for (auto metric : metrics_vec) {
@@ -418,6 +432,26 @@ void C2Agent::handle_describe(const C2ContentResponse 
&resp) {
     }
 
   } else if (resp.name == "configuration") {
+    auto unsanitized_keys = configuration_->getConfiguredKeys();
+    std::vector<std::string> keys;
+    std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), 
std::back_inserter(keys), [](std::string key) {return key.find("pass") == 
std::string::npos;});
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    response.setLabel("configuration_options");
+    C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    options.setLabel("configuration_options");
+    std::string value;
+    for (auto key : keys) {
+      C2ContentResponse option(Operation::ACKNOWLEDGE);
+      option.name = key;
+      if (configuration_->get(key, value)) {
+        option.operation_arguments[key] = value;
+        options.addContent(std::move(option));
+      }
+    }
+    response.addPayload(std::move(options));
+    enqueue_c2_response(std::move(response));
+    return;
+  } else if (resp.name == "manifest") {
     auto keys = configuration_->getConfiguredKeys();
     C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
     response.setLabel("configuration_options");
@@ -433,6 +467,23 @@ void C2Agent::handle_describe(const C2ContentResponse 
&resp) {
       }
     }
     response.addPayload(std::move(options));
+
+    if (device_information_.size() > 0) {
+      C2Payload deviceInfo(Operation::HEARTBEAT);
+      deviceInfo.setLabel("AgentInformation");
+
+      for (auto metric : device_information_) {
+        C2Payload child_metric_payload(Operation::HEARTBEAT);
+        child_metric_payload.setLabel(metric.first);
+        if (metric.second->isArray()) {
+          child_metric_payload.setContainer(true);
+        }
+        serializeMetrics(child_metric_payload, metric.first, 
metric.second->serialize(), metric.second->isArray());
+        deviceInfo.addPayload(std::move(child_metric_payload));
+      }
+      response.addPayload(std::move(deviceInfo));
+    }
+
     enqueue_c2_response(std::move(response));
     return;
   }
@@ -444,13 +495,53 @@ void C2Agent::handle_update(const C2ContentResponse 
&resp) {
   // we've been told to update something
   if (resp.name == "configuration") {
     auto url = resp.operation_arguments.find("location");
+
+    auto persist = resp.operation_arguments.find("persist");
+
     if (url != resp.operation_arguments.end()) {
       // just get the raw data.
-      C2Payload payload(Operation::UPDATE, false, true);
+      C2Payload payload(Operation::TRANSFER, false, true);
+
+      C2Payload &&response = 
protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, 
false);
 
-      C2Payload &&response = protocol_.load()->consumePayload(url->second, 
payload, RECEIVE, false);
+      auto raw_data = response.getRawData();
+      std::string file_path = std::string(raw_data.data(), raw_data.size());
 
-      if (update_sink_->applyUpdate(response.getRawData()) == 0) {
+      std::ifstream new_conf(file_path);
+      std::string raw_data_str((std::istreambuf_iterator<char>(new_conf)), 
std::istreambuf_iterator<char>());
+      unlink(file_path.c_str());
+      // if we can apply the update, we will acknowledge it and then backup 
the configuration file.
+      if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) {
+        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        enqueue_c2_response(std::move(response));
+
+        if (persist != resp.operation_arguments.end() && 
utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
+          // update nifi.flow.configuration.file=./conf/config.yml
+          std::string config_file;
+          configuration_->get(minifi::Configure::nifi_flow_configuration_file, 
config_file);
+          std::stringstream config_file_backup;
+          config_file_backup << config_file << ".bak";
+          // we must be able to successfuly copy the file.
+          bool persist_config = true;
+          bool backup_file = false;
+          std::string backup_config;
+
+          if 
(configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update,
 backup_config) && utils::StringUtils::StringToBool(backup_config, 
backup_file)) {
+            if (utils::file::FileUtils::copy_file(config_file, 
config_file_backup.str()) != 0) {
+              persist_config = false;
+            }
+          }
+          if (persist_config) {
+            std::ofstream writer(config_file);
+            if (writer.is_open()) {
+              auto output = response.getRawData();
+              writer.write(output.data(), output.size());
+            }
+            writer.close();
+          }
+        }
+      } else {
+        logger_->log_debug("update failed.");
         C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
       }
@@ -458,7 +549,36 @@ void C2Agent::handle_update(const C2ContentResponse &resp) 
{
     } else {
       auto update_text = resp.operation_arguments.find("configuration_data");
       if (update_text != resp.operation_arguments.end()) {
-        update_sink_->applyUpdate(update_text->second);
+        if (update_sink_->applyUpdate(url->second.to_string(), 
update_text->second.to_string()) != 0 && persist != 
resp.operation_arguments.end()
+            && 
utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
+          C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+          enqueue_c2_response(std::move(response));
+          // update nifi.flow.configuration.file=./conf/config.yml
+          std::string config_file;
+          std::stringstream config_file_backup;
+          config_file_backup << config_file << ".bak";
+
+          bool persist_config = true;
+          bool backup_file = false;
+          std::string backup_config;
+
+          if 
(configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update,
 backup_config) && utils::StringUtils::StringToBool(backup_config, 
backup_file)) {
+            if (utils::file::FileUtils::copy_file(config_file, 
config_file_backup.str()) != 0) {
+              persist_config = false;
+            }
+          }
+          if (persist_config) {
+            std::ofstream writer(config_file);
+            if (writer.is_open()) {
+              auto output = update_text->second.to_string();
+              writer.write(output.c_str(), output.size());
+            }
+            writer.close();
+          }
+        } else {
+          C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+          enqueue_c2_response(std::move(response));
+        }
       }
     }
   } else if (resp.name == "c2") {
@@ -468,23 +588,69 @@ void C2Agent::handle_update(const C2ContentResponse 
&resp) {
     running_configuration->clear();
 
     for (auto entry : resp.operation_arguments) {
-      running_configuration->set(entry.first, entry.second);
+      running_configuration->set(entry.first, entry.second.to_string());
     }
 
     if (resp.operation_arguments.size() > 0)
       configure(running_configuration);
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    enqueue_c2_response(std::move(response));
+  } else if (resp.name == "agent") {
+    // we are upgrading the agent. therefore we must be given a location
+    auto location = resp.operation_arguments.find("location");
+    if (location != resp.operation_arguments.end()) {
+      // we will not have a raw payload
+      C2Payload payload(Operation::TRANSFER, false, true);
+
+      C2Payload &&response = 
protocol_.load()->consumePayload(location->second.to_string(), payload, 
RECEIVE, false);
+
+      auto raw_data = response.getRawData();
+
+      std::string file_path = std::string(raw_data.data(), raw_data.size());
+
+      // acknowledge the transfer. For a transfer, the response identifier 
should be the checksum of the
+      // file transferred.
+      C2Payload transfer_response(Operation::ACKNOWLEDGE, 
response.getIdentifier(), false, true);
+
+      protocol_.load()->consumePayload(std::move(transfer_response));
+
+      if (allow_updates_) {
+        utils::file::FileUtils::copy_file(file_path, update_location_);
+        // remove the downloaded file.
+        logger_->log_trace("removing command %s", file_path);
+        unlink(file_path.c_str());
+        update_agent();
+      }
+    }
   }
 }
 
-int16_t C2Agent::setMetrics(const std::shared_ptr<state::metrics::Metrics> 
&metric) {
+void C2Agent::restart_agent() {
+  char cwd[1024];
+  getcwd(cwd, sizeof(cwd));
+
+  std::stringstream command;
+  command << cwd << "/minifi.sh restart";
+}
+
+void C2Agent::update_agent() {
+  system(update_command_.c_str());
+}
+
+int16_t C2Agent::setResponseNodes(const 
std::shared_ptr<state::response::ResponseNode> &metric) {
   auto now = std::chrono::steady_clock::now();
-  bool is_device_metric = 
std::dynamic_pointer_cast<state::metrics::DeviceMetric>(metric) != nullptr;
   if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) {
-    if (is_device_metric) {
-      device_information_[metric->getName()] = metric;
-    } else {
-      metrics_map_[metric->getName()] = metric;
-    }
+    root_response_nodes_[metric->getName()] = metric;
+    metrics_mutex_.unlock();
+    return 0;
+  }
+  return -1;
+}
+
+int16_t C2Agent::setMetricsNodes(const 
std::shared_ptr<state::response::ResponseNode> &metric) {
+  auto now = std::chrono::steady_clock::now();
+  if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) {
+    metrics_map_[metric->getName()] = metric;
     metrics_mutex_.unlock();
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/C2Payload.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp
index 8a30e2a..3807b12 100644
--- a/libminifi/src/c2/C2Payload.cpp
+++ b/libminifi/src/c2/C2Payload.cpp
@@ -80,21 +80,24 @@ C2Payload::C2Payload(Operation op, std::string identifier, 
bool resp, bool isRaw
       op_(op),
       raw_(isRaw),
       ident_(identifier),
-      isResponse(resp) {
+      isResponse(resp),
+      is_container_(false) {
 }
 
 C2Payload::C2Payload(Operation op, bool resp, bool isRaw)
     : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)),
       op_(op),
       raw_(isRaw),
-      isResponse(resp) {
+      isResponse(resp),
+      is_container_(false) {
 }
 
 C2Payload::C2Payload(Operation op, state::UpdateState state, bool resp, bool 
isRaw)
     : state::Update(state::UpdateStatus(state, 0)),
       op_(op),
       raw_(isRaw),
-      isResponse(resp) {
+      isResponse(resp),
+      is_container_(false) {
 }
 
 C2Payload::C2Payload(const C2Payload &other)
@@ -106,7 +109,8 @@ C2Payload::C2Payload(const C2Payload &other)
       ident_(other.ident_),
       raw_data_(other.raw_data_),
       payloads_(other.payloads_),
-      content_(other.content_) {
+      content_(other.content_),
+      is_container_(other.is_container_) {
 }
 
 C2Payload::C2Payload(const C2Payload &&other)
@@ -118,7 +122,8 @@ C2Payload::C2Payload(const C2Payload &&other)
       ident_(std::move(other.ident_)),
       raw_data_(std::move(other.raw_data_)),
       payloads_(std::move(other.payloads_)),
-      content_(std::move(other.content_)) {
+      content_(std::move(other.content_)),
+      is_container_(std::move(other.is_container_)) {
 }
 
 void C2Payload::setIdentifier(const std::string &ident) {
@@ -166,14 +171,14 @@ bool C2Payload::isRaw() const {
 }
 
 void C2Payload::setRawData(const std::string &data) {
-  raw_data_ = data;
+  raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data));
 }
 
 void C2Payload::setRawData(const std::vector<char> &data) {
-  raw_data_ = std::string(data.data(), data.size());
+  raw_data_.insert(std::end(raw_data_), std::begin(data), std::end(data));
 }
 
-std::string C2Payload::getRawData() const {
+std::vector<char> C2Payload::getRawData() const {
   return raw_data_;
 }
 
@@ -195,6 +200,7 @@ C2Payload &C2Payload::operator=(const C2Payload &&other) {
   label_ = std::move(other.label_);
   payloads_ = std::move(other.payloads_);
   content_ = std::move(other.content_);
+  is_container_ = std::move(other.is_container_);
   return *this;
 }
 
@@ -209,6 +215,7 @@ C2Payload &C2Payload::operator=(const C2Payload &other) {
   label_ = other.label_;
   payloads_ = other.payloads_;
   content_ = other.content_;
+  is_container_ = other.is_container_;
   return *this;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/ControllerSocketProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp 
b/libminifi/src/c2/ControllerSocketProtocol.cpp
index fb5be6d..ea9cf9b 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -158,7 +158,7 @@ void ControllerSocketProtocol::initialize(const 
std::shared_ptr<core::controller
               logger_->log_debug("Connection broke");
               break;
             }
-            update_sink_->applyUpdate(configuration);
+            update_sink_->applyUpdate("ControllerSocketProtocol", 
configuration);
           }
         }
         break;
@@ -239,7 +239,7 @@ void ControllerSocketProtocol::parse_content(const 
std::vector<C2ContentResponse
     if (payload_content.name == "Components") {
       for (auto content : payload_content.operation_arguments) {
         bool is_enabled = false;
-        minifi::utils::StringUtils::StringToBool(content.second, is_enabled);
+        minifi::utils::StringUtils::StringToBool(content.second.to_string(), 
is_enabled);
         std::lock_guard<std::mutex> lock(controller_mutex_);
         component_map_[content.first] = is_enabled;
       }
@@ -262,9 +262,9 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload 
&payload) {
               uint64_t max = 0;
               for (auto content : payload_content.operation_arguments) {
                 if (content.first == "datasize") {
-                  size = std::stol(content.second);
+                  size = std::stol(content.second.to_string());
                 } else if (content.first == "datasizemax") {
-                  max = std::stol(content.second);
+                  max = std::stol(content.second.to_string());
                 }
               }
               std::lock_guard<std::mutex> lock(controller_mutex_);

Reply via email to