adamdebreceni commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r705980315



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = 
core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line 
separately (Line-by-Line) or "
+                      "buffer the entire file into memory (Entire Text) and 
run against that.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = 
core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line 
separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all 
Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    
->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = 
core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the 
FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} 
placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept 
as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::MaximumBufferSize = 
core::PropertyBuilder::createProperty("Maximum Buffer Size")
+    ->withDescription("Specifies the maximum amount of data to buffer (per 
file or per line, depending on the Evaluation Mode) "
+                      "in order to apply the replacement. "
+                      "In 'Entire Text' evaluation mode, if the FlowFile is 
larger than this value, the FlowFile will be routed to 'failure'. "
+                      "In 'Line-by-Line' evaluation mode, if a single line is 
larger than this value, the FlowFile will be routed to 'failure'. "
+                      "A default value of 1 MB is provided, primarily for 
'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is 
suggested. ")
+    ->isRequired(false)
+    ->withDefaultValue<core::DataSizeValue>("1 MB")
+    ->build();
+
+const core::Property ReplaceText::SearchValue = 
core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. 
"
+                      "Only used for 'Literal Replace' and 'Regex Replace' 
matching strategies. "
+                      "Supports expression language except in Regex Replace 
mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = 
core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular 
Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the 
matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will 
be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace 
mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have 
been successfully processed are routed to this relationship. "
+                                                         "This includes both 
FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could 
not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& 
uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      SearchValue,
+      ReplacementValue,
+      MaximumBufferSize,
+      ReplacementStrategy,
+      EvaluationMode,
+      LineByLineEvaluationMode
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = 
context->getProperty(EvaluationMode);
+  evaluation_mode_ = 
EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), 
evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = 
context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = 
LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", 
LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = 
context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = 
ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", 
ReplacementStrategy.getName(), replacement_strategy_.toString());
+
+  context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_);
+  logger_->log_debug("the %s property is set to %" PRIu64 " bytes", 
MaximumBufferSize.getName(), maximum_buffer_size_);
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    return;
+  }
+
+  readSearchValueProperty(context, flow_file);
+  readReplacementValueProperty(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, 
utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", 
evaluation_mode_.toString())};
+}
+
+void ReplaceText::readSearchValueProperty(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), 
search_value_);

Review comment:
       won't writing members (`search_value_`, `search_regex_`) interfere with 
other threads of this same processor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to