This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 525bfe0 NIFI-8474: Added new Replacement Strategy for variable
substitution in ReplaceText.
525bfe0 is described below
commit 525bfe00b3858d9836c736d5c60acce3c1e13239
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Apr 26 21:30:17 2021 +0200
NIFI-8474: Added new Replacement Strategy for variable substitution in
ReplaceText.
Signed-off-by: Pierre Villard <[email protected]>
This closes #5032.
---
.../nifi/processors/standard/ReplaceText.java | 46 +++++++++++++++++++++-
.../nifi/processors/standard/TestReplaceText.java | 44 +++++++++++++++++++++
2 files changed, 89 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index fb0cbb8..7d5eae4 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.text.StringSubstitutor;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -95,6 +96,7 @@ public class ReplaceText extends AbstractProcessor {
public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace";
public static final String alwaysReplace = "Always Replace";
+ public static final String SUBSTITUTE_VARIABLES_VALUE = "Substitute
Variables";
private static final Pattern unescapedBackReferencePattern =
Pattern.compile("[^\\\\]\\$(\\d+)");
private static final String DEFAULT_REGEX = "(?s)(^.*$)";
private static final String DEFAULT_REPLACEMENT_VALUE = "$1";
@@ -120,6 +122,9 @@ public class ReplaceText extends AbstractProcessor {
static final AllowableValue ALWAYS_REPLACE = new
AllowableValue(alwaysReplace, alwaysReplace,
"Always replaces the entire line or the entire contents of the
FlowFile (depending on the value of the <Evaluation Mode> property) and does
not bother searching "
+ "for any value. When this strategy is chosen, the <Search Value>
property is ignored.");
+ static final AllowableValue SUBSTITUTE_VARIABLES = new
AllowableValue(SUBSTITUTE_VARIABLES_VALUE, SUBSTITUTE_VARIABLES_VALUE,
+ "Substitute variable references (specified in ${var} form) using
FlowFile attributes for looking up the replacement value by variable name. "
+ + "When this strategy is chosen, both the <Search Value>
and <Replacement Value> properties are ignored.");
public static final PropertyDescriptor SEARCH_VALUE = new
PropertyDescriptor.Builder()
.name("Regular Expression")
@@ -163,7 +168,7 @@ public class ReplaceText extends AbstractProcessor {
public static final PropertyDescriptor REPLACEMENT_STRATEGY = new
PropertyDescriptor.Builder()
.name("Replacement Strategy")
.description("The strategy for how and what to replace within the
FlowFile's text content.")
- .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE,
ALWAYS_REPLACE)
+ .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE,
ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
.defaultValue(REGEX_REPLACE.getValue())
.required(true)
.build();
@@ -246,6 +251,7 @@ public class ReplaceText extends AbstractProcessor {
case appendValue:
case prependValue:
case alwaysReplace:
+ case SUBSTITUTE_VARIABLES_VALUE:
default:
// nothing to check, search value is not used
break;
@@ -286,6 +292,9 @@ public class ReplaceText extends AbstractProcessor {
case alwaysReplace:
replacementStrategyExecutor = new AlwaysReplace();
break;
+ case SUBSTITUTE_VARIABLES_VALUE:
+ replacementStrategyExecutor = new SubstituteVariablesReplace();
+ break;
default:
throw new AssertionError();
}
@@ -652,6 +661,41 @@ public class ReplaceText extends AbstractProcessor {
}
}
+ private static class SubstituteVariablesReplace implements
ReplacementStrategyExecutor {
+ @Override
+ public FlowFile replace(FlowFile flowFile, final ProcessSession
session, final ProcessContext context, final String evaluateMode, final Charset
charset, final int maxBufferSize) {
+ final Map<String, String> flowFileAttributes =
flowFile.getAttributes();
+
+ if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+ final int flowFileSize = (int) flowFile.getSize();
+ final int bufferSize = Math.min(maxBufferSize, flowFileSize);
+ final byte[] buffer = new byte[bufferSize];
+
+ flowFile = session.write(flowFile, new StreamCallback() {
+ @Override
+ public void process(final InputStream in, final
OutputStream out) throws IOException {
+ StreamUtils.fillBuffer(in, buffer, false);
+ final String originalContent = new String(buffer, 0,
flowFileSize, charset);
+ final String substitutedContent =
StringSubstitutor.replace(originalContent, flowFileAttributes);
+ out.write(substitutedContent.getBytes(charset));
+ }
+ });
+ } else {
+ flowFile = session.write(flowFile, new
StreamReplaceCallback(charset, maxBufferSize,
context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+ (bw, oneLine) -> {
+ final String substitutedLine =
StringSubstitutor.replace(oneLine, flowFileAttributes);
+ bw.write(substitutedLine);
+ }));
+ }
+ return flowFile;
+ }
+
+ @Override
+ public boolean isAllDataBufferedForEntireText() {
+ return true;
+ }
+ }
+
/**
* If we have a '$' followed by anything other than a number, then escape
* it. E.g., '$d' becomes '\$d' so that it can be used as a literal in a
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
index 341b75e..88ed097 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
@@ -1641,6 +1641,50 @@ public class TestReplaceText {
out.assertContentEquals("WO$1R$2D");
}
+ @Test
+ public void testSubstituteVariablesWithEvaluationModeEntireText() {
+ testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1:
foo\nLine2: bar", ReplaceText.ENTIRE_TEXT, createAttributesMap());
+ }
+
+ @Test
+ public void testSubstituteVariablesWithEvaluationModeLineByLine() {
+ testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1:
foo\nLine2: bar", ReplaceText.LINE_BY_LINE, createAttributesMap());
+ }
+
+ @Test
+ public void testSubstituteVariablesWhenVariableValueMissing() {
+ testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1:
${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, Collections.emptyMap());
+ }
+
+ @Test
+ public void testSubstituteVariablesWhenVariableReferenceEscaped() {
+ testSubstituteVariables("Line1: $${var1}\nLine2: $${var2}", "Line1:
${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, createAttributesMap());
+ }
+
+ @Test
+ public void testSubstituteVariablesWhenVariableNameEmpty() {
+ testSubstituteVariables("Line1: ${}\nLine2: ${}", "Line1: ${}\nLine2:
${}", ReplaceText.ENTIRE_TEXT, createAttributesMap());
+ }
+
+ private void testSubstituteVariables(String inputContent, String
expectedContent, String evaluationMode, Map<String, String> attributesMap) {
+ final TestRunner runner = getRunner();
+ runner.setProperty(ReplaceText.EVALUATION_MODE, evaluationMode);
+ runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY,
ReplaceText.SUBSTITUTE_VARIABLES.getValue());
+ runner.enqueue(inputContent, attributesMap);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+ out.assertContentEquals(expectedContent);
+ }
+
+ private Map<String, String> createAttributesMap() {
+ final Map<String, String> attributesMap = new HashMap<>();
+ attributesMap.put("var1", "foo");
+ attributesMap.put("var2", "bar");
+ return attributesMap;
+ }
+
/*
* A repeated alternation regex such as (A|B)* can lead to
StackOverflowError
* on large input strings.