Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master eb9128c37 -> a330c57a5


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/protocols/RESTProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp 
b/libminifi/src/c2/protocols/RESTProtocol.cpp
index b1cebb0..9439ac6 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -37,75 +37,121 @@ const C2Payload RESTProtocol::parseJsonResponse(const 
C2Payload &payload, const
 
   try {
     rapidjson::ParseResult ok = root.Parse(response.data(), response.size());
-
     if (ok) {
       std::string requested_operation = getOperation(payload);
 
       std::string identifier;
       if (root.HasMember("operationid")) {
         identifier = root["operationid"].GetString();
+      } else if (root.HasMember("operationId")) {
+        identifier = root["operationId"].GetString();
+      } else if (root.HasMember("identifier")) {
+        identifier = root["identifier"].GetString();
       }
+      if (root["requested_operations"].Size() == 0 && 
root["requestedOperations"].Size() == 0)
+        return std::move(C2Payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true));
 
-      if (root.HasMember("operation") && root["operation"].GetString() == 
requested_operation) {
-        if (root["requested_operations"].Size() == 0)
-          return std::move(C2Payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true));
-
-        C2Payload new_payload(payload.getOperation(), 
state::UpdateState::NESTED, true);
+      C2Payload new_payload(payload.getOperation(), 
state::UpdateState::NESTED, true);
 
+      if (!identifier.empty())
         new_payload.setIdentifier(identifier);
+      auto array = root["requested_operations"].GetArray();
+      if (root["requested_operations"].Size() == 0)
+        array = root["requestedOperations"].GetArray();
+      for (const rapidjson::Value& request : array) {
+        Operation newOp = stringToOperation(request["operation"].GetString());
+        C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, 
true);
+        C2ContentResponse new_command(newOp);
+        new_command.delay = 0;
+        new_command.required = true;
+        new_command.ttl = -1;
+
+        // set the identifier if one exists
+        if (request.HasMember("operationid")) {
+          if (request["operationid"].IsNumber())
+            new_command.ident = 
std::to_string(request["operationid"].GetInt64());
+          else if (request["operationid"].IsString())
+            new_command.ident = request["operationid"].GetString();
+          else
+            throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for 
operationid"));
+          nested_payload.setIdentifier(new_command.ident);
+        } else if (request.HasMember("operationId")) {
+          if (request["operationId"].IsNumber())
+            new_command.ident = 
std::to_string(request["operationId"].GetInt64());
+          else if (request["operationId"].IsString())
+            new_command.ident = request["operationId"].GetString();
+          else
+            throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for 
operationId"));
+          nested_payload.setIdentifier(new_command.ident);
+        } else if (request.HasMember("identifier")) {
+          if (request["identifier"].IsNumber())
+            new_command.ident = 
std::to_string(request["identifier"].GetInt64());
+          else if (request["identifier"].IsString())
+            new_command.ident = request["identifier"].GetString();
+          else
+            throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for 
operationid"));
+          nested_payload.setIdentifier(new_command.ident);
+        }
 
-        for (const rapidjson::Value& request : 
root["requested_operations"].GetArray()) {
-          Operation newOp = 
stringToOperation(request["operation"].GetString());
-          C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, 
true);
-          C2ContentResponse new_command(newOp);
-          new_command.delay = 0;
-          new_command.required = true;
-          new_command.ttl = -1;
-
-          // set the identifier if one exists
-          if (request.HasMember("operationid")) {
-            if (request["operationid"].IsNumber())
-              new_command.ident = 
std::to_string(request["operationid"].GetInt64());
-            else if (request["operationid"].IsString())
-              new_command.ident = request["operationid"].GetString();
-            else
-              throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for 
operationid"));
-
-            nested_payload.setIdentifier(new_command.ident);
-          }
-
+        if (request.HasMember("name")) {
           new_command.name = request["name"].GetString();
+        } else if (request.HasMember("operand")) {
+          new_command.name = request["operand"].GetString();
+        }
 
-          if (request.HasMember("content") && request["content"].MemberCount() 
> 0)
+        if (request.HasMember("content") && request["content"].MemberCount() > 
0) {
+          if (request["content"].IsArray()) {
+            for (const auto &member : request["content"].GetArray())
+              new_command.operation_arguments[member.GetString()] = 
member.GetString();
+          } else {
             for (const auto &member : request["content"].GetObject())
               new_command.operation_arguments[member.name.GetString()] = 
member.value.GetString();
-
-          nested_payload.addContent(std::move(new_command));
-          new_payload.addPayload(std::move(nested_payload));
+          }
+        } else if (request.HasMember("args") && request["args"].MemberCount() 
> 0) {
+          if (request["args"].IsArray()) {
+            for (const auto &member : request["args"].GetArray())
+              new_command.operation_arguments[member.GetString()] = 
member.GetString();
+          } else {
+            for (const auto &member : request["args"].GetObject())
+              new_command.operation_arguments[member.name.GetString()] = 
member.value.GetString();
+          }
         }
-
-        // we have a response for this request
-        return new_payload;
+        nested_payload.addContent(std::move(new_command));
+        new_payload.addPayload(std::move(nested_payload));
       }
+
+      // we have a response for this request
+      return new_payload;
+      // }
     }
   } catch (...) {
   }
-  return std::move(C2Payload(payload.getOperation(), 
state::UpdateState::READ_ERROR, true));
+  return std::move(C2Payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true));
 }
 
-void setJsonStr(const std::string& key, const std::string& value, 
rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT
+void setJsonStr(const std::string& key, const state::response::ValueNode& 
value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) {  
// NOLINT
   rapidjson::Value keyVal;
   rapidjson::Value valueVal;
   const char* c_key = key.c_str();
-  const char* c_val = value.c_str();
 
+  auto base_type = value.getValue();
   keyVal.SetString(c_key, key.length(), alloc);
-  valueVal.SetString(c_val, value.length(), alloc);
 
+  if (auto sub_type = 
std::dynamic_pointer_cast<state::response::IntValue>(base_type)) {
+    valueVal.SetInt(sub_type->getValue());
+  } else if (auto sub_type = 
std::dynamic_pointer_cast<state::response::Int64Value>(base_type)) {
+    valueVal.SetInt64(sub_type->getValue());
+  } else if (auto sub_type = 
std::dynamic_pointer_cast<state::response::BoolValue>(base_type)) {
+    valueVal.SetBool(sub_type->getValue());
+  } else {
+    auto str = base_type->getStringValue();
+    const char* c_val = str.c_str();
+    valueVal.SetString(c_val, str.length(), alloc);
+  }
   parent.AddMember(keyVal, valueVal, alloc);
 }
 
-rapidjson::Value getStringValue(const std::string& value, 
rapidjson::Document::AllocatorType& alloc) { // NOLINT
+rapidjson::Value RESTProtocol::getStringValue(const std::string& value, 
rapidjson::Document::AllocatorType& alloc) {  // NOLINT
   rapidjson::Value Val;
   Val.SetString(value.c_str(), value.length(), alloc);
   return Val;
@@ -113,30 +159,60 @@ rapidjson::Value getStringValue(const std::string& value, 
rapidjson::Document::A
 
 void RESTProtocol::mergePayloadContent(rapidjson::Value &target, const 
C2Payload &payload, rapidjson::Document::AllocatorType &alloc) {
   const std::vector<C2ContentResponse> &content = payload.getContent();
+  bool all_empty = content.size() > 0 ? true : false;
+  bool is_parent_array = target.IsArray();
+
+  for (const auto &payload_content : content) {
+    for (auto content : payload_content.operation_arguments) {
+      if (!content.second.empty()) {
+        all_empty = false;
+        break;
+      }
+    }
+    if (!all_empty)
+      break;
+  }
+
+  if (all_empty) {
+    if (!is_parent_array) {
+      target.SetArray();
+      is_parent_array = true;
+    }
+    rapidjson::Value arr(rapidjson::kArrayType);
+    for (const auto &payload_content : content) {
+      for (auto content : payload_content.operation_arguments) {
+        rapidjson::Value keyVal;
+        keyVal.SetString(content.first.c_str(), content.first.length(), alloc);
+        if (is_parent_array)
+          target.PushBack(keyVal, alloc);
+        else
+          arr.PushBack(keyVal, alloc);
+      }
+    }
 
+    if (!is_parent_array) {
+      rapidjson::Value sub_key = getStringValue(payload.getLabel(), alloc);
+      target.AddMember(sub_key, arr, alloc);
+    }
+    return;
+  }
   for (const auto &payload_content : content) {
     rapidjson::Value payload_content_values(rapidjson::kObjectType);
     bool use_sub_option = true;
-
     if (payload_content.op == payload.getOperation()) {
       for (auto content : payload_content.operation_arguments) {
-        if (payload_content.operation_arguments.size() == 1 && 
payload_content.name == content.first) {
-          setJsonStr(payload_content.name, content.second, target, alloc);
-          use_sub_option = false;
-        } else {
-          setJsonStr(content.first, content.second, payload_content_values, 
alloc);
-        }
+        setJsonStr(content.first, content.second, target, alloc);
       }
+    } else {
     }
     if (use_sub_option) {
       rapidjson::Value sub_key = getStringValue(payload_content.name, alloc);
-      target.AddMember(sub_key, payload_content_values, alloc);
     }
   }
 }
 
 std::string RESTProtocol::serializeJsonRootPayload(const C2Payload& payload) {
-  rapidjson::Document json_payload(rapidjson::kObjectType);
+  rapidjson::Document json_payload(payload.isContainer() ? 
rapidjson::kArrayType : rapidjson::kObjectType);
   rapidjson::Document::AllocatorType &alloc = json_payload.GetAllocator();
 
   rapidjson::Value opReqStrVal;
@@ -146,63 +222,83 @@ std::string RESTProtocol::serializeJsonRootPayload(const 
C2Payload& payload) {
 
   std::string operationid = payload.getIdentifier();
   if (operationid.length() > 0) {
-    rapidjson::Value operationIdVal = getStringValue(operationid, alloc);
-    json_payload.AddMember("operationid", operationIdVal, alloc);
+    json_payload.AddMember("operationid", getStringValue(operationid, alloc), 
alloc);
+    json_payload.AddMember("operationId", getStringValue(operationid, alloc), 
alloc);
+    json_payload.AddMember("identifier", getStringValue(operationid, alloc), 
alloc);
   }
 
   mergePayloadContent(json_payload, payload, alloc);
 
   for (const auto &nested_payload : payload.getNestedPayloads()) {
-    rapidjson::Value np_key = getStringValue(nested_payload.getLabel(), alloc);
-    rapidjson::Value np_value = serializeJsonPayload(nested_payload, alloc);
-    json_payload.AddMember(np_key, np_value, alloc);
+    if (!minimize_updates_ || (minimize_updates_ && 
!containsPayload(nested_payload))) {
+      rapidjson::Value np_key = getStringValue(nested_payload.getLabel(), 
alloc);
+      rapidjson::Value np_value = serializeJsonPayload(nested_payload, alloc);
+      if (minimize_updates_) {
+        nested_payloads_.insert(std::pair<std::string, 
C2Payload>(nested_payload.getLabel(), nested_payload));
+      }
+      json_payload.AddMember(np_key, np_value, alloc);
+    }
   }
 
   rapidjson::StringBuffer buffer;
   rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
   json_payload.Accept(writer);
-
-  // std::string ret = ;
   return buffer.GetString();
 }
 
+bool RESTProtocol::containsPayload(const C2Payload &o) {
+  auto it = nested_payloads_.find(o.getLabel());
+  if (it != nested_payloads_.end()) {
+    return it->second == o;
+  }
+  return false;
+}
+
 rapidjson::Value RESTProtocol::serializeJsonPayload(const C2Payload &payload, 
rapidjson::Document::AllocatorType &alloc) {
-  // get the name from the content
-  rapidjson::Value json_payload(rapidjson::kObjectType);
+// get the name from the content
+  rapidjson::Value json_payload(payload.isContainer() ? rapidjson::kArrayType 
: rapidjson::kObjectType);
 
   std::map<std::string, std::list<rapidjson::Value*>> children;
 
   for (const auto &nested_payload : payload.getNestedPayloads()) {
     rapidjson::Value* child_payload = new 
rapidjson::Value(serializeJsonPayload(nested_payload, alloc));
+
     children[nested_payload.getLabel()].push_back(child_payload);
   }
 
-  // child_vector is Pair<string, vector<Value*>>
   for (auto child_vector : children) {
     rapidjson::Value children_json;
     rapidjson::Value newMemberKey = getStringValue(child_vector.first, alloc);
-
     if (child_vector.second.size() > 1) {
       children_json.SetArray();
-      for (auto child : child_vector.second)
-        children_json.PushBack(child->Move(), alloc);
-
-      json_payload.AddMember(newMemberKey, children_json, alloc);
+      for (auto child : child_vector.second) {
+        if (json_payload.IsArray())
+          json_payload.PushBack(child->Move(), alloc);
+        else
+          children_json.PushBack(child->Move(), alloc);
+      }
+      if (!json_payload.IsArray())
+        json_payload.AddMember(newMemberKey, children_json, alloc);
     } else if (child_vector.second.size() == 1) {
       rapidjson::Value* first = child_vector.second.front();
-
-      if (first->IsObject() && first->HasMember(newMemberKey))
-        json_payload.AddMember(newMemberKey, (*first)[newMemberKey].Move(), 
alloc);
-      else
-        json_payload.AddMember(newMemberKey, first->Move(), alloc);
+      if (first->IsObject() && first->HasMember(newMemberKey)) {
+        if (json_payload.IsArray())
+          json_payload.PushBack((*first)[newMemberKey].Move(), alloc);
+        else
+          json_payload.AddMember(newMemberKey, (*first)[newMemberKey].Move(), 
alloc);
+      } else {
+        if (json_payload.IsArray()) {
+          json_payload.PushBack(first->Move(), alloc);
+        } else {
+          json_payload.AddMember(newMemberKey, first->Move(), alloc);
+        }
+      }
     }
-
     for (rapidjson::Value* child : child_vector.second)
       delete child;
   }
 
   mergePayloadContent(json_payload, payload, alloc);
-
   return json_payload;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp 
b/libminifi/src/core/ConfigurableComponent.cpp
index d6a512e..fc8a8fd 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -18,6 +18,7 @@
 
 #include <utility>
 #include <string>
+#include <map>
 #include <vector>
 #include <set>
 
@@ -184,15 +185,13 @@ bool ConfigurableComponent::getDynamicProperty(const 
std::string name, std::stri
 bool ConfigurableComponent::createDynamicProperty(const std::string &name, 
const std::string &value) {
   if (!supportsDynamicProperties()) {
     logger_->log_debug("Attempted to create dynamic property %s, but this 
component does not support creation."
-                           "of dynamic properties.", name);
+                       "of dynamic properties.",
+                       name);
     return false;
   }
 
   Property new_property(name, DEFAULT_DYNAMIC_PROPERTY_DESC, value);
-  logger_->log_info("Processor %s dynamic property '%s' value '%s'",
-                    name.c_str(),
-                    new_property.getName().c_str(),
-                    value.c_str());
+  logger_->log_info("Processor %s dynamic property '%s' value '%s'", 
name.c_str(), new_property.getName().c_str(), value.c_str());
   dynamic_properties_[new_property.getName()] = new_property;
   onDynamicPropertyModified({}, new_property);
   return true;
@@ -232,7 +231,7 @@ bool ConfigurableComponent::updateDynamicProperty(const 
std::string &name, const
   }
 }
 
-std::vector<std::string> ConfigurableComponent::getDynamicPropertyKeys()  {
+std::vector<std::string> ConfigurableComponent::getDynamicPropertyKeys() {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   std::vector<std::string> result;
@@ -244,6 +243,22 @@ std::vector<std::string> 
ConfigurableComponent::getDynamicPropertyKeys()  {
   return result;
 }
 
+std::map<std::string, std::string> ConfigurableComponent::getProperties() {
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+  std::map<std::string, std::string> result;
+
+  for (const auto &pair : properties_) {
+    result.insert(std::pair<std::string, std::string>(pair.first, 
pair.second.getDescription()));
+  }
+
+  for (const auto &pair : dynamic_properties_) {
+    result.insert(std::pair<std::string, std::string>(pair.first, 
pair.second.getDescription()));
+  }
+
+  return result;
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 626c4e4..205f9f2 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -43,7 +43,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string 
name, uuid_t uuid,
     : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
       name_(name),
       type_(type),
-      version_(version),
+      config_version_(version),
       parent_process_group_(parent) {
   if (!uuid)
     // Generate the global UUID for the flow record

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/Property.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp
index dd3a9cb..8733b59 100644
--- a/libminifi/src/core/Property.cpp
+++ b/libminifi/src/core/Property.cpp
@@ -30,7 +30,7 @@ std::string Property::getName() const {
   return name_;
 }
 // Get Description for the property
-std::string Property::getDescription() {
+std::string Property::getDescription() const {
   return description_;
 }
 // Get value for the property
@@ -65,6 +65,7 @@ bool Property::operator <(const Property & right) const {
 const Property &Property::operator=(const Property &other) {
   name_ = other.name_;
   values_ = other.values_;
+  description_ = other.description_;
   isCollection = other.isCollection;
   return *this;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/state/StateManager.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/state/StateManager.cpp 
b/libminifi/src/core/state/StateManager.cpp
index e14646f..ce46bc5 100644
--- a/libminifi/src/core/state/StateManager.cpp
+++ b/libminifi/src/core/state/StateManager.cpp
@@ -20,7 +20,8 @@
 #include <memory>
 #include <utility>
 #include <vector>
-#include "core/state/metrics/MetricsBase.h"
+
+#include "core/state/nodes/MetricsBase.h"
 
 namespace org {
 namespace apache {
@@ -29,9 +30,9 @@ namespace minifi {
 namespace state {
 
 void StateManager::initialize() {
-  metrics_listener_ = std::unique_ptr<state::metrics::MetricsListener>(new 
state::metrics::MetricsListener(shared_from_this(), shared_from_this()));
+  metrics_listener_ = std::unique_ptr<state::response::TreeUpdateListener>(new 
state::response::TreeUpdateListener(shared_from_this(), shared_from_this()));
   // manually add the c2 agent for now
-  listener_thread_pool_.setMaxConcurrentTasks(3);
+  listener_thread_pool_.setMaxConcurrentTasks(2);
   listener_thread_pool_.start();
   controller_running_ = true;
 }
@@ -49,7 +50,7 @@ int16_t StateManager::update(const std::shared_ptr<Update> 
&updateController) {
   if (isStateMonitorRunning()) {
     return -1;
   }
-  int16_t ret = applyUpdate(updateController);
+  int16_t ret = applyUpdate("StateManager", updateController);
   switch (ret) {
     case -1:
       return -1;
@@ -62,7 +63,7 @@ int16_t StateManager::update(const std::shared_ptr<Update> 
&updateController) {
  * Passes metrics to the update controllers if they are a metrics sink.
  * @param metrics metric to pass through
  */
-int16_t StateManager::setMetrics(const std::shared_ptr<metrics::Metrics> 
&metrics) {
+int16_t StateManager::setResponseNodes(const 
std::shared_ptr<response::ResponseNode> &metrics) {
   if (IsNullOrEmpty(metrics)) {
     return -1;
   }
@@ -70,9 +71,9 @@ int16_t StateManager::setMetrics(const 
std::shared_ptr<metrics::Metrics> &metric
   if (mutex_.try_lock_until(now + std::chrono::milliseconds(100))) {
     // update controllers can be metric sinks too
     for (auto controller : updateControllers) {
-      std::shared_ptr<metrics::MetricsSink> sink = 
std::dynamic_pointer_cast<metrics::MetricsSink>(controller);
+      std::shared_ptr<response::ResponseNodeSink> sink = 
std::dynamic_pointer_cast<response::ResponseNodeSink>(controller);
       if (sink != nullptr) {
-        sink->setMetrics(metrics);
+        sink->setResponseNodes(metrics);
       }
     }
     metrics_maps_[metrics->getName()] = metrics;
@@ -85,7 +86,7 @@ int16_t StateManager::setMetrics(const 
std::shared_ptr<metrics::Metrics> &metric
 /**
  * Metrics operations
  */
-int16_t 
StateManager::getMetrics(std::vector<std::shared_ptr<metrics::Metrics>> 
&metric_vector, uint16_t metricsClass) {
+int16_t 
StateManager::getResponseNodes(std::vector<std::shared_ptr<response::ResponseNode>>
 &metric_vector, uint16_t metricsClass) {
   auto now = std::chrono::steady_clock::now();
   const std::chrono::steady_clock::time_point wait_time = now + 
std::chrono::milliseconds(100);
   if (mutex_.try_lock_until(wait_time)) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index b065eff..01cc04a 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -33,7 +33,7 @@ std::shared_ptr<utils::IdGenerator> 
YamlConfiguration::id_generator_ = utils::Id
 
 core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node 
rootFlowNode) {
   uuid_t uuid;
-  int32_t version = 0;
+  int version = 0;
 
   checkRequiredField(&rootFlowNode, "name",
                      CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
@@ -50,10 +50,7 @@ core::ProcessGroup 
*YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
   uuid_parse(id.c_str(), uuid);
 
   if (rootFlowNode["version"]) {
-    std::string value = rootFlowNode["version"].as<std::string>();
-    if (core::Property::StringToInt(value, version)) {
-      logger_->log_debug("parseRootProcessorGroup: version => [%d]", version);
-    }
+    version = rootFlowNode["version"].as<int>();
   }
 
   logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, 
flowName);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp 
b/libminifi/src/processors/GetFile.cpp
index 5d80b42..22a2b73 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -274,7 +274,7 @@ void GetFile::performListing(std::string dir, const 
GetFileRequest &request) {
   closedir(d);
 }
 
-int16_t 
GetFile::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> 
&metric_vector) {
+int16_t 
GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
 &metric_vector) {
   metric_vector.push_back(metrics_);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/processors/GetTCP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetTCP.cpp 
b/libminifi/src/processors/GetTCP.cpp
index 79fe733..0932ebd 100644
--- a/libminifi/src/processors/GetTCP.cpp
+++ b/libminifi/src/processors/GetTCP.cpp
@@ -283,7 +283,7 @@ void GetTCP::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, con
   context->yield();
 }
 
-int16_t 
GetTCP::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> 
&metric_vector) {
+int16_t 
GetTCP::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
 &metric_vector) {
   metric_vector.push_back(metrics_);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/utils/ByteArrayCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp 
b/libminifi/src/utils/ByteArrayCallback.cpp
index 19d815f..ed9ffb7 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -1,6 +1,4 @@
 /**
- * @file SiteToSiteProvenanceReportingTask.cpp
- * SiteToSiteProvenanceReportingTask class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/utils/FileOutputCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/FileOutputCallback.cpp 
b/libminifi/src/utils/FileOutputCallback.cpp
new file mode 100644
index 0000000..8951291
--- /dev/null
+++ b/libminifi/src/utils/FileOutputCallback.cpp
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "utils/FileOutputCallback.h"
+#include <vector>
+#include <utility>
+#include <string>
+#include <memory>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+int64_t FileOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  if (stream->getSize() > 0) {
+    
file_stream_.write(reinterpret_cast<char*>(const_cast<uint8_t*>(stream->getBuffer())),
 stream->getSize());
+    size_ += stream->getSize();
+  }
+  return size_.load();
+}
+
+const std::vector<char> FileOutputCallback::to_string() {
+  std::vector<char> buffer;
+  buffer.insert(std::end(buffer), std::begin(file_), std::end(file_));
+  return buffer;
+}
+
+void FileOutputCallback::close() {
+  is_alive_ = false;
+  file_stream_.close();
+}
+
+size_t FileOutputCallback::getSize() {
+  return size_;
+}
+
+void FileOutputCallback::write(char *data, size_t size) {
+  file_stream_.write(data, size);
+  size_ += size;
+}
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/utils/HTTPClient.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/HTTPClient.cpp 
b/libminifi/src/utils/HTTPClient.cpp
index e7c9d64..35d8f6b 100644
--- a/libminifi/src/utils/HTTPClient.cpp
+++ b/libminifi/src/utils/HTTPClient.cpp
@@ -80,6 +80,52 @@ void parse_url(std::string *url, std::string *host, int 
*port, std::string *prot
   }
 }
 
+void parse_url(std::string *url, std::string *host, int *port, std::string 
*protocol, std::string *path, std::string *query) {
+  std::string http("http://";);
+  std::string https("https://";);
+
+  if (url->compare(0, http.size(), http) == 0)
+    *protocol = http;
+
+  if (url->compare(0, https.size(), https) == 0)
+    *protocol = https;
+
+  if (!protocol->empty()) {
+    size_t pos = url->find_first_of(":", protocol->size());
+
+    if (pos == std::string::npos) {
+      pos = url->size();
+    }
+    size_t ppos = url->find_first_of("/", protocol->size());
+    if (pos == url->size() && ppos < url->size()) {
+      *host = url->substr(protocol->size(), ppos - protocol->size());
+    } else {
+      if (ppos < url->size())
+        *host = url->substr(protocol->size(), pos - protocol->size());
+      else
+        return;
+    }
+    if (pos < url->size() && (*url)[pos] == ':') {
+      if (ppos == std::string::npos) {
+        ppos = url->size();
+      }
+      std::string portStr(url->substr(pos + 1, ppos - pos - 1));
+      if (portStr.size() > 0) {
+        *port = std::stoi(portStr);
+      }
+    }
+
+    auto query_loc = url->find_first_of("?", ppos);
+
+    if (query_loc < url->size()) {
+      *path = url->substr(ppos + 1, query_loc - ppos - 1);
+      *query = url->substr(query_loc + 1, url->size() - query_loc - 1);
+    } else {
+      *path = url->substr(ppos + 1, url->size() - ppos - 1);
+    }
+  }
+}
+
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/test/resources/TestBad.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestBad.yml 
b/libminifi/test/resources/TestBad.yml
new file mode 100644
index 0000000..8988ed0
--- /dev/null
+++ b/libminifi/test/resources/TestBad.yml
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+    - name: invoke
+      id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      class: org.apache.nifi.processors.standard.InvokeHTTP
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list:
+      Properties:
+          HTTP Method: GET
+          Remote URL: http://localhost:10003/geturl
+    - name: LogAttribute
+      id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      class: org.apache.nifi.processors.standard.LogAttribute
+      max concurrent tasks: 1
+      scheduling strategy: TIMER_DRIVEN
+      scheduling period: 1 sec
+      penalization period: 30 sec
+      yield period: 1 sec
+      run duration nanos: 0
+      auto-terminated relationships list: response
+      Properties:
+        Log Level: info
+        Log Payload: true
+
+Connections:
+    - name: TransferFilesToRPG
+      id: 2438e3c8-015a-1000-79ca-83af40ec1997
+      source name: invoke
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+      source relationship name: success
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+    - name: TransferFilesToRPG2
+      id: 2438e3c8-015a-1000-79ca-83af40ec1917
+      source name: LogAttribute
+      source id: 2438e3c8-015a-1000-79ca-83af40ec1992
+      destination name: LogAttribute
+      destination id: 2438e3c8-015a-1000-79ca-83af40ec1992  
+      source relationship name: success
+      max work queue size: 0
+      max work queue data size: 1 MB
+      flowfile expiration: 60 sec
+
+Remotsdjglkdsgjklsdgj
+klsjdgalksdjge Processing Groups:
+    

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/test/unit/C2MetricsTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/C2MetricsTests.cpp 
b/libminifi/test/unit/C2MetricsTests.cpp
index 34843c9..231e634 100644
--- a/libminifi/test/unit/C2MetricsTests.cpp
+++ b/libminifi/test/unit/C2MetricsTests.cpp
@@ -17,18 +17,19 @@
  */
 #include <uuid/uuid.h>
 #include <memory>
+
+#include "../../include/core/state/nodes/ProcessMetrics.h"
+#include "../../include/core/state/nodes/QueueMetrics.h"
+#include "../../include/core/state/nodes/RepositoryMetrics.h"
+#include "../../include/core/state/nodes/SystemMetrics.h"
 #include "../TestBase.h"
 #include "io/ClientSocket.h"
 #include "core/Processor.h"
 #include "core/ClassLoader.h"
 #include "core/yaml/YamlConfiguration.h"
-#include "core/state/metrics/ProcessMetrics.h"
-#include "core/state/metrics/RepositoryMetrics.h"
-#include "core/state/metrics/QueueMetrics.h"
-#include "core/state/metrics/SystemMetrics.h"
 
 TEST_CASE("TestProcessMetrics", "[c2m1]") {
-  minifi::state::metrics::ProcessMetrics metrics;
+  minifi::state::response::ProcessMetrics metrics;
 
   REQUIRE("ProcessMetrics" == metrics.getName());
 
@@ -39,19 +40,18 @@ TEST_CASE("TestProcessMetrics", "[c2m1]") {
 }
 
 TEST_CASE("TestSystemMetrics", "[c2m5]") {
-  minifi::state::metrics::SystemInformation metrics;
+  minifi::state::response::SystemInformation metrics;
 
-  REQUIRE("SystemInformation" == metrics.getName());
+  REQUIRE("systeminfo" == metrics.getName());
 
-  REQUIRE(3 == metrics.serialize().size());
+  REQUIRE(2 == metrics.serialize().size());
 
-  REQUIRE("vcores" == metrics.serialize().at(0).name);
-  REQUIRE("physicalmem" == metrics.serialize().at(1).name);
-  REQUIRE("machinearch" == metrics.serialize().at(2).name);
+  REQUIRE("identifier" == metrics.serialize().at(0).name);
+  REQUIRE("systemInfo" == metrics.serialize().at(1).name);
 }
 
 TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") {
-  minifi::state::metrics::QueueMetrics metrics;
+  minifi::state::response::QueueMetrics metrics;
 
   REQUIRE("QueueMetrics" == metrics.getName());
 
@@ -59,7 +59,7 @@ TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") {
 }
 
 TEST_CASE("QueueMetricsTestConnections", "[c2m3]") {
-  minifi::state::metrics::QueueMetrics metrics;
+  minifi::state::response::QueueMetrics metrics;
 
   REQUIRE("QueueMetrics" == metrics.getName());
 
@@ -79,35 +79,35 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") {
 
   REQUIRE(1 == metrics.serialize().size());
 
-  minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0);
+  minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
   REQUIRE("testconnection" == resp.name);
 
   REQUIRE(4 == resp.children.size());
 
-  minifi::state::metrics::MetricResponse datasize = resp.children.at(0);
+  minifi::state::response::SerializedResponseNode datasize = 
resp.children.at(0);
 
   REQUIRE("datasize" == datasize.name);
-  REQUIRE("0" == datasize.value);
+  REQUIRE("0" == datasize.value.to_string());
 
-  minifi::state::metrics::MetricResponse datasizemax = resp.children.at(1);
+  minifi::state::response::SerializedResponseNode datasizemax = 
resp.children.at(1);
 
   REQUIRE("datasizemax" == datasizemax.name);
   REQUIRE("1024" == datasizemax.value);
 
-  minifi::state::metrics::MetricResponse queued = resp.children.at(2);
+  minifi::state::response::SerializedResponseNode queued = resp.children.at(2);
 
   REQUIRE("queued" == queued.name);
-  REQUIRE("0" == queued.value);
+  REQUIRE("0" == queued.value.to_string());
 
-  minifi::state::metrics::MetricResponse queuedmax = resp.children.at(3);
+  minifi::state::response::SerializedResponseNode queuedmax = 
resp.children.at(3);
 
   REQUIRE("queuedmax" == queuedmax.name);
-  REQUIRE("1024" == queuedmax.value);
+  REQUIRE("1024" == queuedmax.value.to_string());
 }
 
 TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") {
-  minifi::state::metrics::RepositoryMetrics metrics;
+  minifi::state::response::RepositoryMetrics metrics;
 
   REQUIRE("RepositoryMetrics" == metrics.getName());
 
@@ -115,7 +115,7 @@ TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") {
 }
 
 TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
-  minifi::state::metrics::RepositoryMetrics metrics;
+  minifi::state::response::RepositoryMetrics metrics;
 
   REQUIRE("RepositoryMetrics" == metrics.getName());
 
@@ -125,23 +125,23 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
   {
     REQUIRE(1 == metrics.serialize().size());
 
-    minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0);
+    minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
 
     REQUIRE(3 == resp.children.size());
 
-    minifi::state::metrics::MetricResponse running = resp.children.at(0);
+    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
 
     REQUIRE("running" == running.name);
-    REQUIRE("0" == running.value);
+    REQUIRE("false" == running.value.to_string());
 
-    minifi::state::metrics::MetricResponse full = resp.children.at(1);
+    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
 
     REQUIRE("full" == full.name);
-    REQUIRE("0" == full.value);
+    REQUIRE("false" == full.value);
 
-    minifi::state::metrics::MetricResponse size = resp.children.at(2);
+    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
 
     REQUIRE("size" == size.name);
     REQUIRE("0" == size.value);
@@ -151,23 +151,23 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
   {
     REQUIRE(1 == metrics.serialize().size());
 
-    minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0);
+    minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
 
     REQUIRE(3 == resp.children.size());
 
-    minifi::state::metrics::MetricResponse running = resp.children.at(0);
+    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
 
     REQUIRE("running" == running.name);
-    REQUIRE("1" == running.value);
+    REQUIRE("true" == running.value.to_string());
 
-    minifi::state::metrics::MetricResponse full = resp.children.at(1);
+    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
 
     REQUIRE("full" == full.name);
-    REQUIRE("0" == full.value);
+    REQUIRE("false" == full.value);
 
-    minifi::state::metrics::MetricResponse size = resp.children.at(2);
+    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
 
     REQUIRE("size" == size.name);
     REQUIRE("0" == size.value);
@@ -178,26 +178,26 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
   {
     REQUIRE(1 == metrics.serialize().size());
 
-    minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0);
+    minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
 
     REQUIRE(3 == resp.children.size());
 
-    minifi::state::metrics::MetricResponse running = resp.children.at(0);
+    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
 
     REQUIRE("running" == running.name);
-    REQUIRE("1" == running.value);
+    REQUIRE("true" == running.value.to_string());
 
-    minifi::state::metrics::MetricResponse full = resp.children.at(1);
+    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
 
     REQUIRE("full" == full.name);
-    REQUIRE("1" == full.value);
+    REQUIRE("true" == full.value.to_string());
 
-    minifi::state::metrics::MetricResponse size = resp.children.at(2);
+    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
 
     REQUIRE("size" == size.name);
-    REQUIRE("0" == size.value);
+    REQUIRE("0" == size.value.to_string());
   }
 
   repo->stop();
@@ -205,23 +205,23 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
   {
     REQUIRE(1 == metrics.serialize().size());
 
-    minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0);
+    minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
 
     REQUIRE(3 == resp.children.size());
 
-    minifi::state::metrics::MetricResponse running = resp.children.at(0);
+    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
 
     REQUIRE("running" == running.name);
-    REQUIRE("0" == running.value);
+    REQUIRE("false" == running.value.to_string());
 
-    minifi::state::metrics::MetricResponse full = resp.children.at(1);
+    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
 
     REQUIRE("full" == full.name);
-    REQUIRE("1" == full.value);
+    REQUIRE("true" == full.value);
 
-    minifi::state::metrics::MetricResponse size = resp.children.at(2);
+    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
 
     REQUIRE("size" == size.name);
     REQUIRE("0" == size.value);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/test/unit/ControllerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ControllerTests.cpp 
b/libminifi/test/unit/ControllerTests.cpp
index c761b6e..49cb759 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -136,7 +136,7 @@ class TestUpdateSink : public minifi::state::StateMonitor {
    * < 0 is an error code
    * 0 is success
    */
-  virtual int16_t applyUpdate(const std::string &configuration) {
+  virtual int16_t applyUpdate(const std::string &source, const std::string 
&configuration) {
     update_calls++;
     return 0;
   }
@@ -145,7 +145,7 @@ class TestUpdateSink : public minifi::state::StateMonitor {
    * Apply an update that the agent must decode. This is useful for certain 
operations
    * that can't be encapsulated within these definitions.
    */
-  virtual int16_t applyUpdate(const std::shared_ptr<minifi::state::Update> 
&updateController) {
+  virtual int16_t applyUpdate(const std::string &source, const 
std::shared_ptr<minifi::state::Update> &updateController) {
     return 0;
   }
 

Reply via email to