Repository: nifi-minifi-cpp Updated Branches: refs/heads/master eb9128c37 -> a330c57a5
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/c2/protocols/RESTProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp index b1cebb0..9439ac6 100644 --- a/libminifi/src/c2/protocols/RESTProtocol.cpp +++ b/libminifi/src/c2/protocols/RESTProtocol.cpp @@ -37,75 +37,121 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const try { rapidjson::ParseResult ok = root.Parse(response.data(), response.size()); - if (ok) { std::string requested_operation = getOperation(payload); std::string identifier; if (root.HasMember("operationid")) { identifier = root["operationid"].GetString(); + } else if (root.HasMember("operationId")) { + identifier = root["operationId"].GetString(); + } else if (root.HasMember("identifier")) { + identifier = root["identifier"].GetString(); } + if (root["requested_operations"].Size() == 0 && root["requestedOperations"].Size() == 0) + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); - if (root.HasMember("operation") && root["operation"].GetString() == requested_operation) { - if (root["requested_operations"].Size() == 0) - return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); - - C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true); + C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true); + if (!identifier.empty()) new_payload.setIdentifier(identifier); + auto array = root["requested_operations"].GetArray(); + if (root["requested_operations"].Size() == 0) + array = root["requestedOperations"].GetArray(); + for (const rapidjson::Value& request : array) { + Operation newOp = stringToOperation(request["operation"].GetString()); + C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true); + C2ContentResponse new_command(newOp); + new_command.delay = 0; + new_command.required = true; + new_command.ttl = -1; + + // set the identifier if one exists + if (request.HasMember("operationid")) { + if (request["operationid"].IsNumber()) + new_command.ident = std::to_string(request["operationid"].GetInt64()); + else if (request["operationid"].IsString()) + new_command.ident = request["operationid"].GetString(); + else + throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for operationid")); + nested_payload.setIdentifier(new_command.ident); + } else if (request.HasMember("operationId")) { + if (request["operationId"].IsNumber()) + new_command.ident = std::to_string(request["operationId"].GetInt64()); + else if (request["operationId"].IsString()) + new_command.ident = request["operationId"].GetString(); + else + throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for operationId")); + nested_payload.setIdentifier(new_command.ident); + } else if (request.HasMember("identifier")) { + if (request["identifier"].IsNumber()) + new_command.ident = std::to_string(request["identifier"].GetInt64()); + else if (request["identifier"].IsString()) + new_command.ident = request["identifier"].GetString(); + else + throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for operationid")); + nested_payload.setIdentifier(new_command.ident); + } - for (const rapidjson::Value& request : root["requested_operations"].GetArray()) { - Operation newOp = stringToOperation(request["operation"].GetString()); - C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true); - C2ContentResponse new_command(newOp); - new_command.delay = 0; - new_command.required = true; - new_command.ttl = -1; - - // set the identifier if one exists - if (request.HasMember("operationid")) { - if (request["operationid"].IsNumber()) - new_command.ident = std::to_string(request["operationid"].GetInt64()); - else if (request["operationid"].IsString()) - new_command.ident = request["operationid"].GetString(); - else - throw(Exception(SITE2SITE_EXCEPTION, "Invalid type for operationid")); - - nested_payload.setIdentifier(new_command.ident); - } - + if (request.HasMember("name")) { new_command.name = request["name"].GetString(); + } else if (request.HasMember("operand")) { + new_command.name = request["operand"].GetString(); + } - if (request.HasMember("content") && request["content"].MemberCount() > 0) + if (request.HasMember("content") && request["content"].MemberCount() > 0) { + if (request["content"].IsArray()) { + for (const auto &member : request["content"].GetArray()) + new_command.operation_arguments[member.GetString()] = member.GetString(); + } else { for (const auto &member : request["content"].GetObject()) new_command.operation_arguments[member.name.GetString()] = member.value.GetString(); - - nested_payload.addContent(std::move(new_command)); - new_payload.addPayload(std::move(nested_payload)); + } + } else if (request.HasMember("args") && request["args"].MemberCount() > 0) { + if (request["args"].IsArray()) { + for (const auto &member : request["args"].GetArray()) + new_command.operation_arguments[member.GetString()] = member.GetString(); + } else { + for (const auto &member : request["args"].GetObject()) + new_command.operation_arguments[member.name.GetString()] = member.value.GetString(); + } } - - // we have a response for this request - return new_payload; + nested_payload.addContent(std::move(new_command)); + new_payload.addPayload(std::move(nested_payload)); } + + // we have a response for this request + return new_payload; + // } } } catch (...) { } - return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true)); + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); } -void setJsonStr(const std::string& key, const std::string& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT +void setJsonStr(const std::string& key, const state::response::ValueNode& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT rapidjson::Value keyVal; rapidjson::Value valueVal; const char* c_key = key.c_str(); - const char* c_val = value.c_str(); + auto base_type = value.getValue(); keyVal.SetString(c_key, key.length(), alloc); - valueVal.SetString(c_val, value.length(), alloc); + if (auto sub_type = std::dynamic_pointer_cast<state::response::IntValue>(base_type)) { + valueVal.SetInt(sub_type->getValue()); + } else if (auto sub_type = std::dynamic_pointer_cast<state::response::Int64Value>(base_type)) { + valueVal.SetInt64(sub_type->getValue()); + } else if (auto sub_type = std::dynamic_pointer_cast<state::response::BoolValue>(base_type)) { + valueVal.SetBool(sub_type->getValue()); + } else { + auto str = base_type->getStringValue(); + const char* c_val = str.c_str(); + valueVal.SetString(c_val, str.length(), alloc); + } parent.AddMember(keyVal, valueVal, alloc); } -rapidjson::Value getStringValue(const std::string& value, rapidjson::Document::AllocatorType& alloc) { // NOLINT +rapidjson::Value RESTProtocol::getStringValue(const std::string& value, rapidjson::Document::AllocatorType& alloc) { // NOLINT rapidjson::Value Val; Val.SetString(value.c_str(), value.length(), alloc); return Val; @@ -113,30 +159,60 @@ rapidjson::Value getStringValue(const std::string& value, rapidjson::Document::A void RESTProtocol::mergePayloadContent(rapidjson::Value &target, const C2Payload &payload, rapidjson::Document::AllocatorType &alloc) { const std::vector<C2ContentResponse> &content = payload.getContent(); + bool all_empty = content.size() > 0 ? true : false; + bool is_parent_array = target.IsArray(); + + for (const auto &payload_content : content) { + for (auto content : payload_content.operation_arguments) { + if (!content.second.empty()) { + all_empty = false; + break; + } + } + if (!all_empty) + break; + } + + if (all_empty) { + if (!is_parent_array) { + target.SetArray(); + is_parent_array = true; + } + rapidjson::Value arr(rapidjson::kArrayType); + for (const auto &payload_content : content) { + for (auto content : payload_content.operation_arguments) { + rapidjson::Value keyVal; + keyVal.SetString(content.first.c_str(), content.first.length(), alloc); + if (is_parent_array) + target.PushBack(keyVal, alloc); + else + arr.PushBack(keyVal, alloc); + } + } + if (!is_parent_array) { + rapidjson::Value sub_key = getStringValue(payload.getLabel(), alloc); + target.AddMember(sub_key, arr, alloc); + } + return; + } for (const auto &payload_content : content) { rapidjson::Value payload_content_values(rapidjson::kObjectType); bool use_sub_option = true; - if (payload_content.op == payload.getOperation()) { for (auto content : payload_content.operation_arguments) { - if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { - setJsonStr(payload_content.name, content.second, target, alloc); - use_sub_option = false; - } else { - setJsonStr(content.first, content.second, payload_content_values, alloc); - } + setJsonStr(content.first, content.second, target, alloc); } + } else { } if (use_sub_option) { rapidjson::Value sub_key = getStringValue(payload_content.name, alloc); - target.AddMember(sub_key, payload_content_values, alloc); } } } std::string RESTProtocol::serializeJsonRootPayload(const C2Payload& payload) { - rapidjson::Document json_payload(rapidjson::kObjectType); + rapidjson::Document json_payload(payload.isContainer() ? rapidjson::kArrayType : rapidjson::kObjectType); rapidjson::Document::AllocatorType &alloc = json_payload.GetAllocator(); rapidjson::Value opReqStrVal; @@ -146,63 +222,83 @@ std::string RESTProtocol::serializeJsonRootPayload(const C2Payload& payload) { std::string operationid = payload.getIdentifier(); if (operationid.length() > 0) { - rapidjson::Value operationIdVal = getStringValue(operationid, alloc); - json_payload.AddMember("operationid", operationIdVal, alloc); + json_payload.AddMember("operationid", getStringValue(operationid, alloc), alloc); + json_payload.AddMember("operationId", getStringValue(operationid, alloc), alloc); + json_payload.AddMember("identifier", getStringValue(operationid, alloc), alloc); } mergePayloadContent(json_payload, payload, alloc); for (const auto &nested_payload : payload.getNestedPayloads()) { - rapidjson::Value np_key = getStringValue(nested_payload.getLabel(), alloc); - rapidjson::Value np_value = serializeJsonPayload(nested_payload, alloc); - json_payload.AddMember(np_key, np_value, alloc); + if (!minimize_updates_ || (minimize_updates_ && !containsPayload(nested_payload))) { + rapidjson::Value np_key = getStringValue(nested_payload.getLabel(), alloc); + rapidjson::Value np_value = serializeJsonPayload(nested_payload, alloc); + if (minimize_updates_) { + nested_payloads_.insert(std::pair<std::string, C2Payload>(nested_payload.getLabel(), nested_payload)); + } + json_payload.AddMember(np_key, np_value, alloc); + } } rapidjson::StringBuffer buffer; rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer); json_payload.Accept(writer); - - // std::string ret = ; return buffer.GetString(); } +bool RESTProtocol::containsPayload(const C2Payload &o) { + auto it = nested_payloads_.find(o.getLabel()); + if (it != nested_payloads_.end()) { + return it->second == o; + } + return false; +} + rapidjson::Value RESTProtocol::serializeJsonPayload(const C2Payload &payload, rapidjson::Document::AllocatorType &alloc) { - // get the name from the content - rapidjson::Value json_payload(rapidjson::kObjectType); +// get the name from the content + rapidjson::Value json_payload(payload.isContainer() ? rapidjson::kArrayType : rapidjson::kObjectType); std::map<std::string, std::list<rapidjson::Value*>> children; for (const auto &nested_payload : payload.getNestedPayloads()) { rapidjson::Value* child_payload = new rapidjson::Value(serializeJsonPayload(nested_payload, alloc)); + children[nested_payload.getLabel()].push_back(child_payload); } - // child_vector is Pair<string, vector<Value*>> for (auto child_vector : children) { rapidjson::Value children_json; rapidjson::Value newMemberKey = getStringValue(child_vector.first, alloc); - if (child_vector.second.size() > 1) { children_json.SetArray(); - for (auto child : child_vector.second) - children_json.PushBack(child->Move(), alloc); - - json_payload.AddMember(newMemberKey, children_json, alloc); + for (auto child : child_vector.second) { + if (json_payload.IsArray()) + json_payload.PushBack(child->Move(), alloc); + else + children_json.PushBack(child->Move(), alloc); + } + if (!json_payload.IsArray()) + json_payload.AddMember(newMemberKey, children_json, alloc); } else if (child_vector.second.size() == 1) { rapidjson::Value* first = child_vector.second.front(); - - if (first->IsObject() && first->HasMember(newMemberKey)) - json_payload.AddMember(newMemberKey, (*first)[newMemberKey].Move(), alloc); - else - json_payload.AddMember(newMemberKey, first->Move(), alloc); + if (first->IsObject() && first->HasMember(newMemberKey)) { + if (json_payload.IsArray()) + json_payload.PushBack((*first)[newMemberKey].Move(), alloc); + else + json_payload.AddMember(newMemberKey, (*first)[newMemberKey].Move(), alloc); + } else { + if (json_payload.IsArray()) { + json_payload.PushBack(first->Move(), alloc); + } else { + json_payload.AddMember(newMemberKey, first->Move(), alloc); + } + } } - for (rapidjson::Value* child : child_vector.second) delete child; } mergePayloadContent(json_payload, payload, alloc); - return json_payload; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index d6a512e..fc8a8fd 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -18,6 +18,7 @@ #include <utility> #include <string> +#include <map> #include <vector> #include <set> @@ -184,15 +185,13 @@ bool ConfigurableComponent::getDynamicProperty(const std::string name, std::stri bool ConfigurableComponent::createDynamicProperty(const std::string &name, const std::string &value) { if (!supportsDynamicProperties()) { logger_->log_debug("Attempted to create dynamic property %s, but this component does not support creation." - "of dynamic properties.", name); + "of dynamic properties.", + name); return false; } Property new_property(name, DEFAULT_DYNAMIC_PROPERTY_DESC, value); - logger_->log_info("Processor %s dynamic property '%s' value '%s'", - name.c_str(), - new_property.getName().c_str(), - value.c_str()); + logger_->log_info("Processor %s dynamic property '%s' value '%s'", name.c_str(), new_property.getName().c_str(), value.c_str()); dynamic_properties_[new_property.getName()] = new_property; onDynamicPropertyModified({}, new_property); return true; @@ -232,7 +231,7 @@ bool ConfigurableComponent::updateDynamicProperty(const std::string &name, const } } -std::vector<std::string> ConfigurableComponent::getDynamicPropertyKeys() { +std::vector<std::string> ConfigurableComponent::getDynamicPropertyKeys() { std::lock_guard<std::mutex> lock(configuration_mutex_); std::vector<std::string> result; @@ -244,6 +243,22 @@ std::vector<std::string> ConfigurableComponent::getDynamicPropertyKeys() { return result; } +std::map<std::string, std::string> ConfigurableComponent::getProperties() { + std::lock_guard<std::mutex> lock(configuration_mutex_); + + std::map<std::string, std::string> result; + + for (const auto &pair : properties_) { + result.insert(std::pair<std::string, std::string>(pair.first, pair.second.getDescription())); + } + + for (const auto &pair : dynamic_properties_) { + result.insert(std::pair<std::string, std::string>(pair.first, pair.second.getDescription())); + } + + return result; +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 626c4e4..205f9f2 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -43,7 +43,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()), name_(name), type_(type), - version_(version), + config_version_(version), parent_process_group_(parent) { if (!uuid) // Generate the global UUID for the flow record http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/Property.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp index dd3a9cb..8733b59 100644 --- a/libminifi/src/core/Property.cpp +++ b/libminifi/src/core/Property.cpp @@ -30,7 +30,7 @@ std::string Property::getName() const { return name_; } // Get Description for the property -std::string Property::getDescription() { +std::string Property::getDescription() const { return description_; } // Get value for the property @@ -65,6 +65,7 @@ bool Property::operator <(const Property & right) const { const Property &Property::operator=(const Property &other) { name_ = other.name_; values_ = other.values_; + description_ = other.description_; isCollection = other.isCollection; return *this; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/state/StateManager.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/state/StateManager.cpp b/libminifi/src/core/state/StateManager.cpp index e14646f..ce46bc5 100644 --- a/libminifi/src/core/state/StateManager.cpp +++ b/libminifi/src/core/state/StateManager.cpp @@ -20,7 +20,8 @@ #include <memory> #include <utility> #include <vector> -#include "core/state/metrics/MetricsBase.h" + +#include "core/state/nodes/MetricsBase.h" namespace org { namespace apache { @@ -29,9 +30,9 @@ namespace minifi { namespace state { void StateManager::initialize() { - metrics_listener_ = std::unique_ptr<state::metrics::MetricsListener>(new state::metrics::MetricsListener(shared_from_this(), shared_from_this())); + metrics_listener_ = std::unique_ptr<state::response::TreeUpdateListener>(new state::response::TreeUpdateListener(shared_from_this(), shared_from_this())); // manually add the c2 agent for now - listener_thread_pool_.setMaxConcurrentTasks(3); + listener_thread_pool_.setMaxConcurrentTasks(2); listener_thread_pool_.start(); controller_running_ = true; } @@ -49,7 +50,7 @@ int16_t StateManager::update(const std::shared_ptr<Update> &updateController) { if (isStateMonitorRunning()) { return -1; } - int16_t ret = applyUpdate(updateController); + int16_t ret = applyUpdate("StateManager", updateController); switch (ret) { case -1: return -1; @@ -62,7 +63,7 @@ int16_t StateManager::update(const std::shared_ptr<Update> &updateController) { * Passes metrics to the update controllers if they are a metrics sink. * @param metrics metric to pass through */ -int16_t StateManager::setMetrics(const std::shared_ptr<metrics::Metrics> &metrics) { +int16_t StateManager::setResponseNodes(const std::shared_ptr<response::ResponseNode> &metrics) { if (IsNullOrEmpty(metrics)) { return -1; } @@ -70,9 +71,9 @@ int16_t StateManager::setMetrics(const std::shared_ptr<metrics::Metrics> &metric if (mutex_.try_lock_until(now + std::chrono::milliseconds(100))) { // update controllers can be metric sinks too for (auto controller : updateControllers) { - std::shared_ptr<metrics::MetricsSink> sink = std::dynamic_pointer_cast<metrics::MetricsSink>(controller); + std::shared_ptr<response::ResponseNodeSink> sink = std::dynamic_pointer_cast<response::ResponseNodeSink>(controller); if (sink != nullptr) { - sink->setMetrics(metrics); + sink->setResponseNodes(metrics); } } metrics_maps_[metrics->getName()] = metrics; @@ -85,7 +86,7 @@ int16_t StateManager::setMetrics(const std::shared_ptr<metrics::Metrics> &metric /** * Metrics operations */ -int16_t StateManager::getMetrics(std::vector<std::shared_ptr<metrics::Metrics>> &metric_vector, uint16_t metricsClass) { +int16_t StateManager::getResponseNodes(std::vector<std::shared_ptr<response::ResponseNode>> &metric_vector, uint16_t metricsClass) { auto now = std::chrono::steady_clock::now(); const std::chrono::steady_clock::time_point wait_time = now + std::chrono::milliseconds(100); if (mutex_.try_lock_until(wait_time)) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index b065eff..01cc04a 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -33,7 +33,7 @@ std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = utils::Id core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { uuid_t uuid; - int32_t version = 0; + int version = 0; checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); @@ -50,10 +50,7 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root uuid_parse(id.c_str(), uuid); if (rootFlowNode["version"]) { - std::string value = rootFlowNode["version"].as<std::string>(); - if (core::Property::StringToInt(value, version)) { - logger_->log_debug("parseRootProcessorGroup: version => [%d]", version); - } + version = rootFlowNode["version"].as<int>(); } logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index 5d80b42..22a2b73 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -274,7 +274,7 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) { closedir(d); } -int16_t GetFile::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector) { +int16_t GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) { metric_vector.push_back(metrics_); return 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/processors/GetTCP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetTCP.cpp b/libminifi/src/processors/GetTCP.cpp index 79fe733..0932ebd 100644 --- a/libminifi/src/processors/GetTCP.cpp +++ b/libminifi/src/processors/GetTCP.cpp @@ -283,7 +283,7 @@ void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con context->yield(); } -int16_t GetTCP::getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector) { +int16_t GetTCP::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) { metric_vector.push_back(metrics_); return 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/utils/ByteArrayCallback.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp index 19d815f..ed9ffb7 100644 --- a/libminifi/src/utils/ByteArrayCallback.cpp +++ b/libminifi/src/utils/ByteArrayCallback.cpp @@ -1,6 +1,4 @@ /** - * @file SiteToSiteProvenanceReportingTask.cpp - * SiteToSiteProvenanceReportingTask class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/utils/FileOutputCallback.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/FileOutputCallback.cpp b/libminifi/src/utils/FileOutputCallback.cpp new file mode 100644 index 0000000..8951291 --- /dev/null +++ b/libminifi/src/utils/FileOutputCallback.cpp @@ -0,0 +1,61 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/FileOutputCallback.h" +#include <vector> +#include <utility> +#include <string> +#include <memory> +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +int64_t FileOutputCallback::process(std::shared_ptr<io::BaseStream> stream) { + if (stream->getSize() > 0) { + file_stream_.write(reinterpret_cast<char*>(const_cast<uint8_t*>(stream->getBuffer())), stream->getSize()); + size_ += stream->getSize(); + } + return size_.load(); +} + +const std::vector<char> FileOutputCallback::to_string() { + std::vector<char> buffer; + buffer.insert(std::end(buffer), std::begin(file_), std::end(file_)); + return buffer; +} + +void FileOutputCallback::close() { + is_alive_ = false; + file_stream_.close(); +} + +size_t FileOutputCallback::getSize() { + return size_; +} + +void FileOutputCallback::write(char *data, size_t size) { + file_stream_.write(data, size); + size_ += size; +} + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/src/utils/HTTPClient.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/HTTPClient.cpp b/libminifi/src/utils/HTTPClient.cpp index e7c9d64..35d8f6b 100644 --- a/libminifi/src/utils/HTTPClient.cpp +++ b/libminifi/src/utils/HTTPClient.cpp @@ -80,6 +80,52 @@ void parse_url(std::string *url, std::string *host, int *port, std::string *prot } } +void parse_url(std::string *url, std::string *host, int *port, std::string *protocol, std::string *path, std::string *query) { + std::string http("http://"); + std::string https("https://"); + + if (url->compare(0, http.size(), http) == 0) + *protocol = http; + + if (url->compare(0, https.size(), https) == 0) + *protocol = https; + + if (!protocol->empty()) { + size_t pos = url->find_first_of(":", protocol->size()); + + if (pos == std::string::npos) { + pos = url->size(); + } + size_t ppos = url->find_first_of("/", protocol->size()); + if (pos == url->size() && ppos < url->size()) { + *host = url->substr(protocol->size(), ppos - protocol->size()); + } else { + if (ppos < url->size()) + *host = url->substr(protocol->size(), pos - protocol->size()); + else + return; + } + if (pos < url->size() && (*url)[pos] == ':') { + if (ppos == std::string::npos) { + ppos = url->size(); + } + std::string portStr(url->substr(pos + 1, ppos - pos - 1)); + if (portStr.size() > 0) { + *port = std::stoi(portStr); + } + } + + auto query_loc = url->find_first_of("?", ppos); + + if (query_loc < url->size()) { + *path = url->substr(ppos + 1, query_loc - ppos - 1); + *query = url->substr(query_loc + 1, url->size() - query_loc - 1); + } else { + *path = url->substr(ppos + 1, url->size() - ppos - 1); + } + } +} + } /* namespace utils */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/test/resources/TestBad.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestBad.yml b/libminifi/test/resources/TestBad.yml new file mode 100644 index 0000000..8988ed0 --- /dev/null +++ b/libminifi/test/resources/TestBad.yml @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +Flow Controller: + name: MiNiFi Flow + id: 2438e3c8-015a-1000-79ca-83af40ec1990 +Processors: + - name: invoke + id: 2438e3c8-015a-1000-79ca-83af40ec1991 + class: org.apache.nifi.processors.standard.InvokeHTTP + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + HTTP Method: GET + Remote URL: http://localhost:10003/geturl + - name: LogAttribute + id: 2438e3c8-015a-1000-79ca-83af40ec1992 + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: response + Properties: + Log Level: info + Log Payload: true + +Connections: + - name: TransferFilesToRPG + id: 2438e3c8-015a-1000-79ca-83af40ec1997 + source name: invoke + source id: 2438e3c8-015a-1000-79ca-83af40ec1991 + source relationship name: success + destination name: LogAttribute + destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + - name: TransferFilesToRPG2 + id: 2438e3c8-015a-1000-79ca-83af40ec1917 + source name: LogAttribute + source id: 2438e3c8-015a-1000-79ca-83af40ec1992 + destination name: LogAttribute + destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 + source relationship name: success + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + +Remotsdjglkdsgjklsdgj +klsjdgalksdjge Processing Groups: + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/test/unit/C2MetricsTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/C2MetricsTests.cpp b/libminifi/test/unit/C2MetricsTests.cpp index 34843c9..231e634 100644 --- a/libminifi/test/unit/C2MetricsTests.cpp +++ b/libminifi/test/unit/C2MetricsTests.cpp @@ -17,18 +17,19 @@ */ #include <uuid/uuid.h> #include <memory> + +#include "../../include/core/state/nodes/ProcessMetrics.h" +#include "../../include/core/state/nodes/QueueMetrics.h" +#include "../../include/core/state/nodes/RepositoryMetrics.h" +#include "../../include/core/state/nodes/SystemMetrics.h" #include "../TestBase.h" #include "io/ClientSocket.h" #include "core/Processor.h" #include "core/ClassLoader.h" #include "core/yaml/YamlConfiguration.h" -#include "core/state/metrics/ProcessMetrics.h" -#include "core/state/metrics/RepositoryMetrics.h" -#include "core/state/metrics/QueueMetrics.h" -#include "core/state/metrics/SystemMetrics.h" TEST_CASE("TestProcessMetrics", "[c2m1]") { - minifi::state::metrics::ProcessMetrics metrics; + minifi::state::response::ProcessMetrics metrics; REQUIRE("ProcessMetrics" == metrics.getName()); @@ -39,19 +40,18 @@ TEST_CASE("TestProcessMetrics", "[c2m1]") { } TEST_CASE("TestSystemMetrics", "[c2m5]") { - minifi::state::metrics::SystemInformation metrics; + minifi::state::response::SystemInformation metrics; - REQUIRE("SystemInformation" == metrics.getName()); + REQUIRE("systeminfo" == metrics.getName()); - REQUIRE(3 == metrics.serialize().size()); + REQUIRE(2 == metrics.serialize().size()); - REQUIRE("vcores" == metrics.serialize().at(0).name); - REQUIRE("physicalmem" == metrics.serialize().at(1).name); - REQUIRE("machinearch" == metrics.serialize().at(2).name); + REQUIRE("identifier" == metrics.serialize().at(0).name); + REQUIRE("systemInfo" == metrics.serialize().at(1).name); } TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") { - minifi::state::metrics::QueueMetrics metrics; + minifi::state::response::QueueMetrics metrics; REQUIRE("QueueMetrics" == metrics.getName()); @@ -59,7 +59,7 @@ TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") { } TEST_CASE("QueueMetricsTestConnections", "[c2m3]") { - minifi::state::metrics::QueueMetrics metrics; + minifi::state::response::QueueMetrics metrics; REQUIRE("QueueMetrics" == metrics.getName()); @@ -79,35 +79,35 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") { REQUIRE(1 == metrics.serialize().size()); - minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("testconnection" == resp.name); REQUIRE(4 == resp.children.size()); - minifi::state::metrics::MetricResponse datasize = resp.children.at(0); + minifi::state::response::SerializedResponseNode datasize = resp.children.at(0); REQUIRE("datasize" == datasize.name); - REQUIRE("0" == datasize.value); + REQUIRE("0" == datasize.value.to_string()); - minifi::state::metrics::MetricResponse datasizemax = resp.children.at(1); + minifi::state::response::SerializedResponseNode datasizemax = resp.children.at(1); REQUIRE("datasizemax" == datasizemax.name); REQUIRE("1024" == datasizemax.value); - minifi::state::metrics::MetricResponse queued = resp.children.at(2); + minifi::state::response::SerializedResponseNode queued = resp.children.at(2); REQUIRE("queued" == queued.name); - REQUIRE("0" == queued.value); + REQUIRE("0" == queued.value.to_string()); - minifi::state::metrics::MetricResponse queuedmax = resp.children.at(3); + minifi::state::response::SerializedResponseNode queuedmax = resp.children.at(3); REQUIRE("queuedmax" == queuedmax.name); - REQUIRE("1024" == queuedmax.value); + REQUIRE("1024" == queuedmax.value.to_string()); } TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") { - minifi::state::metrics::RepositoryMetrics metrics; + minifi::state::response::RepositoryMetrics metrics; REQUIRE("RepositoryMetrics" == metrics.getName()); @@ -115,7 +115,7 @@ TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") { } TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { - minifi::state::metrics::RepositoryMetrics metrics; + minifi::state::response::RepositoryMetrics metrics; REQUIRE("RepositoryMetrics" == metrics.getName()); @@ -125,23 +125,23 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { { REQUIRE(1 == metrics.serialize().size()); - minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); REQUIRE(3 == resp.children.size()); - minifi::state::metrics::MetricResponse running = resp.children.at(0); + minifi::state::response::SerializedResponseNode running = resp.children.at(0); REQUIRE("running" == running.name); - REQUIRE("0" == running.value); + REQUIRE("false" == running.value.to_string()); - minifi::state::metrics::MetricResponse full = resp.children.at(1); + minifi::state::response::SerializedResponseNode full = resp.children.at(1); REQUIRE("full" == full.name); - REQUIRE("0" == full.value); + REQUIRE("false" == full.value); - minifi::state::metrics::MetricResponse size = resp.children.at(2); + minifi::state::response::SerializedResponseNode size = resp.children.at(2); REQUIRE("size" == size.name); REQUIRE("0" == size.value); @@ -151,23 +151,23 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { { REQUIRE(1 == metrics.serialize().size()); - minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); REQUIRE(3 == resp.children.size()); - minifi::state::metrics::MetricResponse running = resp.children.at(0); + minifi::state::response::SerializedResponseNode running = resp.children.at(0); REQUIRE("running" == running.name); - REQUIRE("1" == running.value); + REQUIRE("true" == running.value.to_string()); - minifi::state::metrics::MetricResponse full = resp.children.at(1); + minifi::state::response::SerializedResponseNode full = resp.children.at(1); REQUIRE("full" == full.name); - REQUIRE("0" == full.value); + REQUIRE("false" == full.value); - minifi::state::metrics::MetricResponse size = resp.children.at(2); + minifi::state::response::SerializedResponseNode size = resp.children.at(2); REQUIRE("size" == size.name); REQUIRE("0" == size.value); @@ -178,26 +178,26 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { { REQUIRE(1 == metrics.serialize().size()); - minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); REQUIRE(3 == resp.children.size()); - minifi::state::metrics::MetricResponse running = resp.children.at(0); + minifi::state::response::SerializedResponseNode running = resp.children.at(0); REQUIRE("running" == running.name); - REQUIRE("1" == running.value); + REQUIRE("true" == running.value.to_string()); - minifi::state::metrics::MetricResponse full = resp.children.at(1); + minifi::state::response::SerializedResponseNode full = resp.children.at(1); REQUIRE("full" == full.name); - REQUIRE("1" == full.value); + REQUIRE("true" == full.value.to_string()); - minifi::state::metrics::MetricResponse size = resp.children.at(2); + minifi::state::response::SerializedResponseNode size = resp.children.at(2); REQUIRE("size" == size.name); - REQUIRE("0" == size.value); + REQUIRE("0" == size.value.to_string()); } repo->stop(); @@ -205,23 +205,23 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { { REQUIRE(1 == metrics.serialize().size()); - minifi::state::metrics::MetricResponse resp = metrics.serialize().at(0); + minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); REQUIRE(3 == resp.children.size()); - minifi::state::metrics::MetricResponse running = resp.children.at(0); + minifi::state::response::SerializedResponseNode running = resp.children.at(0); REQUIRE("running" == running.name); - REQUIRE("0" == running.value); + REQUIRE("false" == running.value.to_string()); - minifi::state::metrics::MetricResponse full = resp.children.at(1); + minifi::state::response::SerializedResponseNode full = resp.children.at(1); REQUIRE("full" == full.name); - REQUIRE("1" == full.value); + REQUIRE("true" == full.value); - minifi::state::metrics::MetricResponse size = resp.children.at(2); + minifi::state::response::SerializedResponseNode size = resp.children.at(2); REQUIRE("size" == size.name); REQUIRE("0" == size.value); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/test/unit/ControllerTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp index c761b6e..49cb759 100644 --- a/libminifi/test/unit/ControllerTests.cpp +++ b/libminifi/test/unit/ControllerTests.cpp @@ -136,7 +136,7 @@ class TestUpdateSink : public minifi::state::StateMonitor { * < 0 is an error code * 0 is success */ - virtual int16_t applyUpdate(const std::string &configuration) { + virtual int16_t applyUpdate(const std::string &source, const std::string &configuration) { update_calls++; return 0; } @@ -145,7 +145,7 @@ class TestUpdateSink : public minifi::state::StateMonitor { * Apply an update that the agent must decode. This is useful for certain operations * that can't be encapsulated within these definitions. */ - virtual int16_t applyUpdate(const std::shared_ptr<minifi::state::Update> &updateController) { + virtual int16_t applyUpdate(const std::string &source, const std::shared_ptr<minifi::state::Update> &updateController) { return 0; }
