szaszm commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r712829713
##########
File path: libminifi/include/io/StreamPipe.h
##########
@@ -42,6 +42,11 @@ class OutputStreamCallback {
virtual ~OutputStreamCallback() = default;
virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
};
+class InputOutputStreamCallback {
+ public:
+ virtual ~InputOutputStreamCallback() = default;
+ virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const
std::shared_ptr<io::BaseStream>& output) = 0;
+};
Review comment:
Instead of following the same flawed pattern of the past, I prefer doing
things the C++ way, which in this case would be passing function objects
directly (potentially type-erased using std::function) instead of making them
have to inherit *Callback.
LineByLineInputOutputStreamCallback could still exist, but `process` would
need to become `operator()` and it wouldn't need to be a subclass.
##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode =
core::PropertyBuilder::createProperty("Evaluation Mode")
+ ->withDescription("Run the 'Replacement Strategy' against each line
separately (Line-by-Line) or "
+ "against the whole input treated as a single string
(Entire Text).")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+ ->withAllowableValues(EvaluationModeType::values())
+ ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode =
core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+ ->withDescription("Run the 'Replacement Strategy' against each line
separately (Line-by-Line) for All lines in the FlowFile, "
+ "First Line (Header) only, Last Line (Footer) only, all
Except the First Line (Header) or all Except the Last Line (Footer).")
+ ->isRequired(false)
+
->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+ ->withAllowableValues(LineByLineEvaluationModeType::values())
+ ->build();
+
+const core::Property ReplaceText::ReplacementStrategy =
core::PropertyBuilder::createProperty("Replacement Strategy")
+ ->withDescription("The strategy for how and what to replace within the
FlowFile's text content. "
+ "Substitute Variables replaces ${attribute_name}
placeholders with the corresponding attribute's value "
+ "(if an attribute is not found, the placeholder is kept
as it was).")
+ ->isRequired(true)
+ ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+ ->withAllowableValues(ReplacementStrategyType::values())
+ ->build();
+
+const core::Property ReplaceText::SearchValue =
core::PropertyBuilder::createProperty("Search Value")
+ ->withDescription("The Search Value to search for in the FlowFile content.
"
+ "Only used for 'Literal Replace' and 'Regex Replace'
matching strategies. "
+ "Supports expression language except in Regex Replace
mode.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property ReplaceText::ReplacementValue =
core::PropertyBuilder::createProperty("Replacement Value")
+ ->withDescription("The value to insert using the 'Replacement Strategy'. "
+ "Using 'Regex Replace' back-references to Regular
Expression capturing groups are supported: "
+ "$& is the entire matched substring, $1, $2, ... are the
matched capturing groups. Use $$1 for a literal $1. "
+ "Back-references to non-existent capturing groups will
be replaced by empty strings. "
+ "Supports expression language except in Regex Replace
mode.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have
been successfully processed are routed to this relationship. "
+ "This includes both
FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could
not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier&
uuid)
+ : core::Processor(name, uuid),
+ logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+ setSupportedProperties({
+ EvaluationMode,
+ LineByLineEvaluationMode,
+ ReplacementStrategy,
+ SearchValue,
+ ReplacementValue
+ });
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+ gsl_Expects(context);
+
+ const std::optional<std::string> evaluation_mode =
context->getProperty(EvaluationMode);
+ evaluation_mode_ =
EvaluationModeType::parse(evaluation_mode.value().c_str());
+ logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(),
evaluation_mode_.toString());
+
+ const std::optional<std::string> line_by_line_evaluation_mode =
context->getProperty(LineByLineEvaluationMode);
+ if (line_by_line_evaluation_mode) {
+ line_by_line_evaluation_mode_ =
LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+ logger_->log_debug("the %s property is set to %s",
LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+ }
+
+ const std::optional<std::string> replacement_strategy =
context->getProperty(ReplacementStrategy);
+ replacement_strategy_ =
ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+ logger_->log_debug("the %s property is set to %s",
ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context);
+ gsl_Expects(session);
+
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
+ if (!flow_file) {
+ logger_->log_trace("No flow file");
+ yield();
+ return;
+ }
+
+ Parameters parameters = readParameters(context, flow_file);
+
+ switch (evaluation_mode_.value()) {
+ case EvaluationModeType::ENTIRE_TEXT:
+ replaceTextInEntireFile(flow_file, session, parameters);
+ return;
+ case EvaluationModeType::LINE_BY_LINE:
+ replaceTextLineByLine(flow_file, session, parameters);
+ return;
+ }
+
+ throw Exception{PROCESSOR_EXCEPTION,
utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ",
evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::FlowFile>& flow_file) const {
+ Parameters parameters;
+
+ bool found_search_value;
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ found_search_value = context->getProperty(SearchValue.getName(),
parameters.search_value_);
+ } else {
+ found_search_value = context->getProperty(SearchValue,
parameters.search_value_, flow_file);
+ }
+ if (found_search_value) {
+ logger_->log_debug("the %s property is set to %s", SearchValue.getName(),
parameters.search_value_);
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ parameters.search_regex_ = std::regex{parameters.search_value_};
+ }
+ }
+ if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE ||
replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) &&
parameters.search_value_.empty()) {
+ throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error:
missing or empty ", SearchValue.getName(), " property")};
+ }
+
+ bool found_replacement_value;
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ found_replacement_value = context->getProperty(ReplacementValue.getName(),
parameters.replacement_value_);
+ } else {
+ found_replacement_value = context->getProperty(ReplacementValue,
parameters.replacement_value_, flow_file);
+ }
+ if (found_replacement_value) {
+ logger_->log_debug("the %s property is set to %s",
ReplacementValue.getName(), parameters.replacement_value_);
+ } else {
+ throw Exception{PROCESSOR_EXCEPTION,
utils::StringUtils::join_pack("Missing required property: ",
ReplacementValue.getName())};
+ }
+
+ if (evaluation_mode_ == EvaluationModeType::LINE_BY_LINE) {
+ const auto [chomped_value, line_ending] =
utils::StringUtils::chomp(parameters.replacement_value_);
+ parameters.replacement_value_ = chomped_value;
Review comment:
Consider moving this string to avoid an extra allocation.
```suggestion
auto [chomped_value, line_ending] =
utils::StringUtils::chomp(parameters.replacement_value_);
parameters.replacement_value_ = std::move(chomped_value);
```
##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/Resource.h"
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode =
core::PropertyBuilder::createProperty("Evaluation Mode")
+ ->withDescription("Run the 'Replacement Strategy' against each line
separately (Line-by-Line) or "
+ "against the whole input treated as a single string
(Entire Text).")
+ ->isRequired(true)
+ ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+ ->withAllowableValues(EvaluationModeType::values())
+ ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode =
core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+ ->withDescription("Run the 'Replacement Strategy' against each line
separately (Line-by-Line) for All lines in the FlowFile, "
+ "First Line (Header) only, Last Line (Footer) only, all
Except the First Line (Header) or all Except the Last Line (Footer).")
+ ->isRequired(false)
+
->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+ ->withAllowableValues(LineByLineEvaluationModeType::values())
+ ->build();
+
+const core::Property ReplaceText::ReplacementStrategy =
core::PropertyBuilder::createProperty("Replacement Strategy")
+ ->withDescription("The strategy for how and what to replace within the
FlowFile's text content. "
+ "Substitute Variables replaces ${attribute_name}
placeholders with the corresponding attribute's value "
+ "(if an attribute is not found, the placeholder is kept
as it was).")
+ ->isRequired(true)
+ ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+ ->withAllowableValues(ReplacementStrategyType::values())
+ ->build();
+
+const core::Property ReplaceText::SearchValue =
core::PropertyBuilder::createProperty("Search Value")
+ ->withDescription("The Search Value to search for in the FlowFile content.
"
+ "Only used for 'Literal Replace' and 'Regex Replace'
matching strategies. "
+ "Supports expression language except in Regex Replace
mode.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Property ReplaceText::ReplacementValue =
core::PropertyBuilder::createProperty("Replacement Value")
+ ->withDescription("The value to insert using the 'Replacement Strategy'. "
+ "Using 'Regex Replace' back-references to Regular
Expression capturing groups are supported: "
+ "$& is the entire matched substring, $1, $2, ... are the
matched capturing groups. Use $$1 for a literal $1. "
+ "Back-references to non-existent capturing groups will
be replaced by empty strings. "
+ "Supports expression language except in Regex Replace
mode.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have
been successfully processed are routed to this relationship. "
+ "This includes both
FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could
not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier&
uuid)
+ : core::Processor(name, uuid),
+ logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+void ReplaceText::initialize() {
+ setSupportedProperties({
+ EvaluationMode,
+ LineByLineEvaluationMode,
+ ReplacementStrategy,
+ SearchValue,
+ ReplacementValue
+ });
+ setSupportedRelationships({
+ Success,
+ Failure
+ });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+ gsl_Expects(context);
+
+ const std::optional<std::string> evaluation_mode =
context->getProperty(EvaluationMode);
+ evaluation_mode_ =
EvaluationModeType::parse(evaluation_mode.value().c_str());
+ logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(),
evaluation_mode_.toString());
+
+ const std::optional<std::string> line_by_line_evaluation_mode =
context->getProperty(LineByLineEvaluationMode);
+ if (line_by_line_evaluation_mode) {
+ line_by_line_evaluation_mode_ =
LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+ logger_->log_debug("the %s property is set to %s",
LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+ }
+
+ const std::optional<std::string> replacement_strategy =
context->getProperty(ReplacementStrategy);
+ replacement_strategy_ =
ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+ logger_->log_debug("the %s property is set to %s",
ReplacementStrategy.getName(), replacement_strategy_.toString());
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>&
context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context);
+ gsl_Expects(session);
+
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
+ if (!flow_file) {
+ logger_->log_trace("No flow file");
+ yield();
+ return;
+ }
+
+ Parameters parameters = readParameters(context, flow_file);
+
+ switch (evaluation_mode_.value()) {
+ case EvaluationModeType::ENTIRE_TEXT:
+ replaceTextInEntireFile(flow_file, session, parameters);
+ return;
+ case EvaluationModeType::LINE_BY_LINE:
+ replaceTextLineByLine(flow_file, session, parameters);
+ return;
+ }
+
+ throw Exception{PROCESSOR_EXCEPTION,
utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ",
evaluation_mode_.toString())};
+}
+
+ReplaceText::Parameters ReplaceText::readParameters(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::FlowFile>& flow_file) const {
+ Parameters parameters;
+
+ bool found_search_value;
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ found_search_value = context->getProperty(SearchValue.getName(),
parameters.search_value_);
+ } else {
+ found_search_value = context->getProperty(SearchValue,
parameters.search_value_, flow_file);
+ }
+ if (found_search_value) {
+ logger_->log_debug("the %s property is set to %s", SearchValue.getName(),
parameters.search_value_);
+ if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+ parameters.search_regex_ = std::regex{parameters.search_value_};
+ }
+ }
Review comment:
Can we do anything to avoid compiling a regex on each flow file? I
didn't measure but I feel like it might be a significant performance bottleneck.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]