fgerlits commented on a change in pull request #1170: URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r717317315
########## File path: extensions/standard-processors/processors/ReplaceText.cpp ########## @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ReplaceText.h" + +#include <algorithm> +#include <vector> + +#include "core/Resource.h" +#include "core/TypedValues.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/LineByLineInputOutputStreamCallback.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Property ReplaceText::EvaluationMode = core::PropertyBuilder::createProperty("Evaluation Mode") + ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or " + "against the whole input treated as a single string (Entire Text).") + ->isRequired(true) + ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE)) + ->withAllowableValues(EvaluationModeType::values()) + ->build(); + +const core::Property ReplaceText::LineByLineEvaluationMode = core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode") + ->withDescription("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for All lines in the FlowFile, " + "First Line (Header) only, Last Line (Footer) only, all Except the First Line (Header) or all Except the Last Line (Footer).") + ->isRequired(false) + ->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL)) + ->withAllowableValues(LineByLineEvaluationModeType::values()) + ->build(); + +const core::Property ReplaceText::ReplacementStrategy = core::PropertyBuilder::createProperty("Replacement Strategy") + ->withDescription("The strategy for how and what to replace within the FlowFile's text content. " + "Substitute Variables replaces ${attribute_name} placeholders with the corresponding attribute's value " + "(if an attribute is not found, the placeholder is kept as it was).") + ->isRequired(true) + ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE)) + ->withAllowableValues(ReplacementStrategyType::values()) + ->build(); + +const core::Property ReplaceText::SearchValue = core::PropertyBuilder::createProperty("Search Value") + ->withDescription("The Search Value to search for in the FlowFile content. " + "Only used for 'Literal Replace' and 'Regex Replace' matching strategies. " + "Supports expression language except in Regex Replace mode.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build(); + +const core::Property ReplaceText::ReplacementValue = core::PropertyBuilder::createProperty("Replacement Value") + ->withDescription("The value to insert using the 'Replacement Strategy'. " + "Using 'Regex Replace' back-references to Regular Expression capturing groups are supported: " + "$& is the entire matched substring, $1, $2, ... are the matched capturing groups. Use $$1 for a literal $1. " + "Back-references to non-existent capturing groups will be replaced by empty strings. " + "Supports expression language except in Regex Replace mode.") + ->isRequired(true) + ->supportsExpressionLanguage(true) + ->build(); + +const core::Relationship ReplaceText::Success("success", "FlowFiles that have been successfully processed are routed to this relationship. " + "This includes both FlowFiles that had text replaced and those that did not."); +const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could not be updated are routed to this relationship."); + +ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& uuid) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<ReplaceText>::getLogger()) { +} + +void ReplaceText::initialize() { + setSupportedProperties({ + EvaluationMode, + LineByLineEvaluationMode, + ReplacementStrategy, + SearchValue, + ReplacementValue + }); + setSupportedRelationships({ + Success, + Failure + }); +} + +void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context); + + const std::optional<std::string> evaluation_mode = context->getProperty(EvaluationMode); + evaluation_mode_ = EvaluationModeType::parse(evaluation_mode.value().c_str()); + logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), evaluation_mode_.toString()); + + const std::optional<std::string> line_by_line_evaluation_mode = context->getProperty(LineByLineEvaluationMode); + if (line_by_line_evaluation_mode) { + line_by_line_evaluation_mode_ = LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str()); + logger_->log_debug("the %s property is set to %s", LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString()); + } + + const std::optional<std::string> replacement_strategy = context->getProperty(ReplacementStrategy); + replacement_strategy_ = ReplacementStrategyType::parse(replacement_strategy.value().c_str()); + logger_->log_debug("the %s property is set to %s", ReplacementStrategy.getName(), replacement_strategy_.toString()); +} + +void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { + gsl_Expects(context); + gsl_Expects(session); + + std::shared_ptr<core::FlowFile> flow_file = session->get(); + if (!flow_file) { + logger_->log_trace("No flow file"); + yield(); + return; + } + + Parameters parameters = readParameters(context, flow_file); + + switch (evaluation_mode_.value()) { + case EvaluationModeType::ENTIRE_TEXT: + replaceTextInEntireFile(flow_file, session, parameters); + return; + case EvaluationModeType::LINE_BY_LINE: + replaceTextLineByLine(flow_file, session, parameters); + return; + } + + throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", evaluation_mode_.toString())}; +} + +ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::FlowFile>& flow_file) const { + Parameters parameters; + + bool found_search_value; + if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) { + found_search_value = context->getProperty(SearchValue.getName(), parameters.search_value_); + } else { + found_search_value = context->getProperty(SearchValue, parameters.search_value_, flow_file); + } + if (found_search_value) { + logger_->log_debug("the %s property is set to %s", SearchValue.getName(), parameters.search_value_); + if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) { + parameters.search_regex_ = std::regex{parameters.search_value_}; + } + } + if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && parameters.search_value_.empty()) { + throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: missing or empty ", SearchValue.getName(), " property")}; + } + + bool found_replacement_value; + if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) { + found_replacement_value = context->getProperty(ReplacementValue.getName(), parameters.replacement_value_); + } else { + found_replacement_value = context->getProperty(ReplacementValue, parameters.replacement_value_, flow_file); + } + if (found_replacement_value) { + logger_->log_debug("the %s property is set to %s", ReplacementValue.getName(), parameters.replacement_value_); + } else { + throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Missing required property: ", ReplacementValue.getName())}; + } + + if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) { + auto [chomped_value, line_ending] = utils::StringUtils::chomp(parameters.replacement_value_); + parameters.replacement_value_ = std::move(chomped_value); + } + + return parameters; +} + +namespace { + +struct ReadFlowFileIntoBuffer : public InputStreamCallback { + std::vector<uint8_t> buffer_; + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + size_t bytes_read = stream->read(buffer_, stream->size()); + return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read); + } +}; + +struct WriteBufferToFlowFile : public OutputStreamCallback { + const std::vector<uint8_t>& buffer_; + + explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {} + + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + size_t bytes_written = stream->write(buffer_, buffer_.size()); + return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written); + } +}; + +} // namespace + +void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const { + gsl_Expects(flow_file); + gsl_Expects(session); + + try { + ReadFlowFileIntoBuffer read_callback; + session->read(flow_file, &read_callback); + + std::string input{read_callback.buffer_.begin(), read_callback.buffer_.end()}; + std::string output = applyReplacements(input, flow_file, parameters); + std::vector<uint8_t> modified_text{output.begin(), output.end()}; + + WriteBufferToFlowFile write_callback{modified_text}; + session->write(flow_file, &write_callback); + + session->transfer(flow_file, Success); + } catch (const Exception& exception) { + logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what()); + session->transfer(flow_file, Failure); + } +} + +void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const { + gsl_Expects(flow_file); + gsl_Expects(session); + + try { + utils::LineByLineInputOutputStreamCallback read_write_callback{[this, &flow_file, ¶meters](const std::string& input_line, bool is_first_line, bool is_last_line) { + switch (line_by_line_evaluation_mode_.value()) { + case LineByLineEvaluationModeType::ALL: + return applyReplacements(input_line, flow_file, parameters); + case LineByLineEvaluationModeType::FIRST_LINE: + return is_first_line ? applyReplacements(input_line, flow_file, parameters) : input_line; + case LineByLineEvaluationModeType::LAST_LINE: + return is_last_line ? applyReplacements(input_line, flow_file, parameters) : input_line; + case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE: + return is_first_line ? input_line : applyReplacements(input_line, flow_file, parameters); + case LineByLineEvaluationModeType::EXCEPT_LAST_LINE: + return is_last_line ? input_line: applyReplacements(input_line, flow_file, parameters); + } + throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())}; + }}; + session->readWrite(flow_file, &read_write_callback); + session->transfer(flow_file, Success); + } catch (const Exception& exception) { + logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what()); + session->transfer(flow_file, Failure); + } +} + +std::string ReplaceText::applyReplacements(const std::string& input, const std::shared_ptr<core::FlowFile>& flow_file, const Parameters& parameters) const { + const auto [chomped_input, line_ending] = utils::StringUtils::chomp(input); + + switch (replacement_strategy_.value()) { + case ReplacementStrategyType::PREPEND: + return parameters.replacement_value_ + input; + + case ReplacementStrategyType::APPEND: + return chomped_input + parameters.replacement_value_ + line_ending; + + case ReplacementStrategyType::REGEX_REPLACE: + return std::regex_replace(chomped_input, parameters.search_regex_, parameters.replacement_value_) + line_ending; + + case ReplacementStrategyType::LITERAL_REPLACE: + return applyLiteralReplace(chomped_input, parameters) + line_ending; + + case ReplacementStrategyType::ALWAYS_REPLACE: + return parameters.replacement_value_ + line_ending; + + case ReplacementStrategyType::SUBSTITUTE_VARIABLES: + return applySubstituteVariables(chomped_input, flow_file) + line_ending; + } + + throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": ", replacement_strategy_.toString())}; +} + +std::string ReplaceText::applyLiteralReplace(const std::string& input, const Parameters& parameters) const { Review comment: I think pretty patterns are useful, too, as they improve readability. Also, I suspect it won't take long before cpplint and clang-tidy disagree about something and then we'll be stuck. But okay: https://github.com/apache/nifi-minifi-cpp/pull/1170/commits/4b1476c18104f4fa9de3994c9b366c284177e779. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
