adamdebreceni commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705980315
########## File path: extensions/standard-processors/processors/ReplaceText.cpp ########## @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ReplaceText.h" + +#include <algorithm> +#include <vector> + +#include "core/TypedValues.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/LineByLineInputOutputStreamCallback.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode") + ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or " + "buffer the entire file into memory (Entire Text) and run against that.") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE)) + ->withAllowableValues(EvaluationModeType::values()) + ->build(); + +const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode") + ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, " + "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).") + ->isRequired(false) + ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL)) + ->withAllowableValues(LineByLineEvaluationModeType::values()) + ->build(); + +const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy") + ->withDescription("The strategy for how and what to replace within the FlowFile's text content. " + "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value " + "(if an attribute is not found, the placeholder is kept as it was).") + ->isRequired(true) + ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE)) + ->withAllowableValues(ReplacementStrategyType::values()) + ->build(); + +const core::Property ReplaceText::MaximumBufferSize = core::PropertyBuilder::createProperty("Maximum Buffer Size") + ->withDescription("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) " + "in order to apply the replacement. " + "In 'Entire Text' evaluation mode, if the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. " + "In 'Line-by-Line' evaluation mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. " + "A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is suggested. ") + ->isRequired(false) + ->withDefaultValue<core::DataSizeValue>("1 MB") + ->build(); + +const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value") + ->withDescription("The Search Value to search for in the FlowFile content. " + "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. " + "Supports expression language except in Regex Replace mode.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build(); + +const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value") + ->withDescription("The value to insert using the 'Replacement Strategy'. " + "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: " + "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. " + "Back-references to non-existent capturing groups will be replaced by empty strings. " + "Supports expression language except in Regex Replace mode.") + ->isRequired(true) + ->supportsExpressionLanguage(true) + ->build(); + +const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. " + "This includes both FlowFiles that had text replaced and those that did not."); +const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship."); + +ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<ReplaceText>::getLogger()) { +} + +core::annotation::Input ReplaceText::getInputRequirement() const { + return core::annotation::Input::INPUT_REQUIRED; +} + +void ReplaceText::initialize() { + setSupportedProperties({ + SearchValue, + ReplacementValue, + MaximumBufferSize, + ReplacementStrategy, + EvaluationMode, + LineByLineEvaluationMode + }); + setSupportedRelationships({ + Success, + Failure + }); +} + +void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + + const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode); + evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str()); + logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString()); + + const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode); + if (line_by_line_evaluation_mode) { + line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str()); + logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString()); + } + + const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy); + replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str()); + logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString()); + + context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_); + logger_->log_debug("the %s property is set to %" PRIu64 " bytes", MaximumBufferSize.getName(), maximum_buffer_size_); +} + +void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context); + gsl_Expects(session); + + std::shared_ptr<core::FlowFile> flow_file = session->get(); + if (!flow_file) { + logger_->log_trace("No flow file"); + return; + } + + readSearchValueProperty(context, flow_file); + readReplacementValueProperty(context, flow_file); + + switch (evaluation_mode_.value()) { + case EvaluationModeType::ENTIRE_TEXT: + replaceTextInEntireFile(flow_file, session); + return; + case EvaluationModeType::LINE_BY_LINE: + replaceTextLineByLine(flow_file, session); + return; + } + + throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())}; +} + +void ReplaceText::readSearchValueProperty(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) { + bool found_search_value; + if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) { + found_search_value = context->getProperty(SearchValue.getName(), search_value_); Review comment: won't writing members (`search_value_`, `search_regex_`) interfere with other threads of this same processor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
