lordgamez commented on a change in pull request #1137: URL: https://github.com/apache/nifi-minifi-cpp/pull/1137#discussion_r685967212
########## File path: extensions/standard-processors/processors/AttributesToJSON.h ########## @@ -0,0 +1,103 @@ +/** + * @file AttributesToJSON.h + * AttributesToJSON class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <vector> +#include <string> +#include <set> +#include <unordered_set> +#include <memory> +#include <map> +#include <regex> + +#include "rapidjson/document.h" +#include "core/Processor.h" +#include "core/Property.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class AttributesToJSON : public core::Processor { + public: + static const std::set<std::string> DESTINATIONS; + + explicit AttributesToJSON(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<AttributesToJSON>::getLogger()), + core_attributes_(core::SpecialFlowAttribute::getSpecialFlowAttributes()) { + } + static constexpr char const* ProcessorName = "AttributesToJSON"; Review comment: Updated in 4e8f82bc2e1f7997a7d51dab9ff413fc0a1d0e13 ########## File path: extensions/standard-processors/processors/AttributesToJSON.h ########## @@ -0,0 +1,103 @@ +/** + * @file AttributesToJSON.h + * AttributesToJSON class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <vector> +#include <string> +#include <set> +#include <unordered_set> +#include <memory> +#include <map> +#include <regex> + +#include "rapidjson/document.h" +#include "core/Processor.h" +#include "core/Property.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class AttributesToJSON : public core::Processor { + public: + static const std::set<std::string> DESTINATIONS; + + explicit AttributesToJSON(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<AttributesToJSON>::getLogger()), + core_attributes_(core::SpecialFlowAttribute::getSpecialFlowAttributes()) { + } + static constexpr char const* ProcessorName = "AttributesToJSON"; + // Supported Properties + static const core::Property AttributesList; + static const core::Property AttributesRegularExpression; + static const core::Property Destination; + static const core::Property IncludeCoreAttributes; + static const core::Property NullValue; + + // Supported Relationships + static core::Relationship Success; + + void initialize() override; + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* sessionFactory) override; + void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_REQUIRED; + } + + private: + class WriteCallback : public OutputStreamCallback { + public: + explicit WriteCallback(const std::string& json_data) : json_data_(json_data) {} + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(json_data_.data()), json_data_.length()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); + } + private: + std::string json_data_; + }; + + bool isCoreAttributeToBeFiltered(const std::string& attribute) const; + bool matchesAttributeRegex(const std::string& attribute); + void addAttributeToJson(rapidjson::Document& document, const std::string& key, const std::string& value); + std::string buildAttributeJsonData(std::map<std::string, std::string>&& attributes); + + std::shared_ptr<logging::Logger> logger_; + const std::unordered_set<std::string> core_attributes_; + std::vector<std::string> attribute_list_; + std::string attributes_regular_expression_str_; + std::regex attributes_regular_expression_; + bool write_to_attribute_ = true; Review comment: Updated in 4e8f82bc2e1f7997a7d51dab9ff413fc0a1d0e13 ########## File path: extensions/standard-processors/processors/AttributesToJSON.h ########## @@ -0,0 +1,103 @@ +/** + * @file AttributesToJSON.h + * AttributesToJSON class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <vector> +#include <string> +#include <set> +#include <unordered_set> +#include <memory> +#include <map> +#include <regex> + +#include "rapidjson/document.h" +#include "core/Processor.h" +#include "core/Property.h" +#include "core/logging/Logger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class AttributesToJSON : public core::Processor { + public: + static const std::set<std::string> DESTINATIONS; + + explicit AttributesToJSON(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<AttributesToJSON>::getLogger()), + core_attributes_(core::SpecialFlowAttribute::getSpecialFlowAttributes()) { + } + static constexpr char const* ProcessorName = "AttributesToJSON"; + // Supported Properties + static const core::Property AttributesList; + static const core::Property AttributesRegularExpression; + static const core::Property Destination; + static const core::Property IncludeCoreAttributes; + static const core::Property NullValue; + + // Supported Relationships + static core::Relationship Success; + + void initialize() override; + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* sessionFactory) override; + void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_REQUIRED; + } + + private: + class WriteCallback : public OutputStreamCallback { + public: + explicit WriteCallback(const std::string& json_data) : json_data_(json_data) {} + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(json_data_.data()), json_data_.length()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); + } + private: + std::string json_data_; + }; + + bool isCoreAttributeToBeFiltered(const std::string& attribute) const; + bool matchesAttributeRegex(const std::string& attribute); + void addAttributeToJson(rapidjson::Document& document, const std::string& key, const std::string& value); + std::string buildAttributeJsonData(std::map<std::string, std::string>&& attributes); + + std::shared_ptr<logging::Logger> logger_; + const std::unordered_set<std::string> core_attributes_; + std::vector<std::string> attribute_list_; + std::string attributes_regular_expression_str_; + std::regex attributes_regular_expression_; Review comment: Updated in 4e8f82bc2e1f7997a7d51dab9ff413fc0a1d0e13 ########## File path: extensions/standard-processors/processors/AttributesToJSON.cpp ########## @@ -0,0 +1,162 @@ +/** + * @file AttributesToJSON.cpp + * AttributesToJSON class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AttributesToJSON.h" + +#include "rapidjson/writer.h" +#include "utils/StringUtils.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +const std::set<std::string> AttributesToJSON::DESTINATIONS({"flowfile-attribute", "flowfile-content"}); + +const core::Property AttributesToJSON::AttributesList( + core::PropertyBuilder::createProperty("Attributes List") + ->withDescription("Comma separated list of attributes to be included in the resulting JSON. " + "If this value is left empty then all existing Attributes will be included. This list of attributes is case sensitive. " + "If an attribute specified in the list is not found it will be be emitted to the resulting JSON with an empty string or NULL value.") + ->build()); + +const core::Property AttributesToJSON::AttributesRegularExpression( + core::PropertyBuilder::createProperty("Attributes Regular Expression") + ->withDescription("Regular expression that will be evaluated against the flow file attributes to select the matching attributes. " + "This property can be used in combination with the attributes list property.") + ->build()); + +const core::Property AttributesToJSON::Destination( + core::PropertyBuilder::createProperty("Destination") + ->withDescription("Control if JSON value is written as a new flowfile attribute 'JSONAttributes' or written in the flowfile content. " + "Writing to flowfile content will overwrite any existing flowfile content.") + ->isRequired(true) + ->withDefaultValue<std::string>("flowfile-attribute") + ->withAllowableValues<std::string>(DESTINATIONS) + ->build()); + +const core::Property AttributesToJSON::IncludeCoreAttributes( + core::PropertyBuilder::createProperty("Include Core Attributes") + ->withDescription("Determines if the FlowFile core attributes which are contained in every FlowFile should be included in the final JSON value generated.") + ->isRequired(true) + ->withDefaultValue<bool>(true) + ->build()); + +const core::Property AttributesToJSON::NullValue( + core::PropertyBuilder::createProperty("Null Value") + ->withDescription("If true a non existing or empty attribute will be NULL in the resulting JSON. If false an empty string will be placed in the JSON.") + ->isRequired(true) + ->withDefaultValue<bool>(false) + ->build()); + +core::Relationship AttributesToJSON::Success("success", "All FlowFiles received are routed to success"); + +void AttributesToJSON::initialize() { + setSupportedProperties({ + AttributesList, + AttributesRegularExpression, + Destination, + IncludeCoreAttributes, + NullValue + }); + setSupportedRelationships({Success}); +} + +void AttributesToJSON::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) { + std::string attributes; + context->getProperty(AttributesList.getName(), attributes); + attribute_list_ = utils::StringUtils::splitRemovingEmpty(attributes, ","); + context->getProperty(AttributesRegularExpression.getName(), attributes_regular_expression_str_); + if (!attributes_regular_expression_str_.empty()) { + attributes_regular_expression_ = std::regex(attributes_regular_expression_str_); + } + write_to_attribute_ = utils::parsePropertyWithAllowableValuesOrThrow(*context, Destination.getName(), DESTINATIONS) == "flowfile-attribute"; + context->getProperty(IncludeCoreAttributes.getName(), include_core_attributes_); + context->getProperty(NullValue.getName(), null_value_); +} + +bool AttributesToJSON::isCoreAttributeToBeFiltered(const std::string& attribute) const { + return !include_core_attributes_ && core_attributes_.find(attribute) != core_attributes_.end(); +} + +bool AttributesToJSON::matchesAttributeRegex(const std::string& attribute) { + return attributes_regular_expression_str_.empty() || std::regex_search(attribute, attributes_regular_expression_); +} + +void AttributesToJSON::addAttributeToJson(rapidjson::Document& document, const std::string& key, const std::string& value) { + if (isCoreAttributeToBeFiltered(key)) { + logger_->log_debug("Core attribute '%s' will not be included in the attributes JSON.", key); + return; + } + if (!matchesAttributeRegex(key)) { + logger_->log_debug("Attribute '%s' does not match the set regex, therefore it will not be included in the attributes JSON.", key); + return; + } + rapidjson::Value json_key(key.c_str(), document.GetAllocator()); + rapidjson::Value json_val; + if (!value.empty() || !null_value_) { + json_val.SetString(value.c_str(), document.GetAllocator()); + } + document.AddMember(json_key, json_val, document.GetAllocator()); +} + +std::string AttributesToJSON::buildAttributeJsonData(std::map<std::string, std::string>&& attributes) { + auto root = rapidjson::Document(rapidjson::kObjectType); + if (!attribute_list_.empty()) { + for (const auto& attribute : attribute_list_) { + addAttributeToJson(root, attribute, attributes[attribute]); + } + } else { + for (const auto& kvp : attributes) { Review comment: Updated in 4e8f82bc2e1f7997a7d51dab9ff413fc0a1d0e13 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
