This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 525bfe0  NIFI-8474: Added new Replacement Strategy for variable 
substitution in ReplaceText.
525bfe0 is described below

commit 525bfe00b3858d9836c736d5c60acce3c1e13239
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Apr 26 21:30:17 2021 +0200

    NIFI-8474: Added new Replacement Strategy for variable substitution in 
ReplaceText.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5032.
---
 .../nifi/processors/standard/ReplaceText.java      | 46 +++++++++++++++++++++-
 .../nifi/processors/standard/TestReplaceText.java  | 44 +++++++++++++++++++++
 2 files changed, 89 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index fb0cbb8..7d5eae4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.text.StringSubstitutor;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -95,6 +96,7 @@ public class ReplaceText extends AbstractProcessor {
     public static final String regexReplaceValue = "Regex Replace";
     public static final String literalReplaceValue = "Literal Replace";
     public static final String alwaysReplace = "Always Replace";
+    public static final String SUBSTITUTE_VARIABLES_VALUE = "Substitute 
Variables";
     private static final Pattern unescapedBackReferencePattern = 
Pattern.compile("[^\\\\]\\$(\\d+)");
     private static final String DEFAULT_REGEX = "(?s)(^.*$)";
     private static final String DEFAULT_REPLACEMENT_VALUE = "$1";
@@ -120,6 +122,9 @@ public class ReplaceText extends AbstractProcessor {
     static final AllowableValue ALWAYS_REPLACE = new 
AllowableValue(alwaysReplace, alwaysReplace,
         "Always replaces the entire line or the entire contents of the 
FlowFile (depending on the value of the <Evaluation Mode> property) and does 
not bother searching "
             + "for any value. When this strategy is chosen, the <Search Value> 
property is ignored.");
+    static final AllowableValue SUBSTITUTE_VARIABLES = new 
AllowableValue(SUBSTITUTE_VARIABLES_VALUE, SUBSTITUTE_VARIABLES_VALUE,
+            "Substitute variable references (specified in ${var} form) using 
FlowFile attributes for looking up the replacement value by variable name. "
+                    + "When this strategy is chosen, both the <Search Value> 
and <Replacement Value> properties are ignored.");
 
     public static final PropertyDescriptor SEARCH_VALUE = new 
PropertyDescriptor.Builder()
         .name("Regular Expression")
@@ -163,7 +168,7 @@ public class ReplaceText extends AbstractProcessor {
     public static final PropertyDescriptor REPLACEMENT_STRATEGY = new 
PropertyDescriptor.Builder()
         .name("Replacement Strategy")
         .description("The strategy for how and what to replace within the 
FlowFile's text content.")
-        .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, 
ALWAYS_REPLACE)
+        .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, 
ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
         .defaultValue(REGEX_REPLACE.getValue())
         .required(true)
         .build();
@@ -246,6 +251,7 @@ public class ReplaceText extends AbstractProcessor {
             case appendValue:
             case prependValue:
             case alwaysReplace:
+            case SUBSTITUTE_VARIABLES_VALUE:
             default:
                 // nothing to check, search value is not used
                 break;
@@ -286,6 +292,9 @@ public class ReplaceText extends AbstractProcessor {
             case alwaysReplace:
                 replacementStrategyExecutor = new AlwaysReplace();
                 break;
+            case SUBSTITUTE_VARIABLES_VALUE:
+                replacementStrategyExecutor = new SubstituteVariablesReplace();
+                break;
             default:
                 throw new AssertionError();
         }
@@ -652,6 +661,41 @@ public class ReplaceText extends AbstractProcessor {
         }
     }
 
+    private static class SubstituteVariablesReplace implements 
ReplacementStrategyExecutor {
+        @Override
+        public FlowFile replace(FlowFile flowFile, final ProcessSession 
session, final ProcessContext context, final String evaluateMode, final Charset 
charset, final int maxBufferSize) {
+            final Map<String, String> flowFileAttributes = 
flowFile.getAttributes();
+
+            if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+                final int flowFileSize = (int) flowFile.getSize();
+                final int bufferSize = Math.min(maxBufferSize, flowFileSize);
+                final byte[] buffer = new byte[bufferSize];
+
+                flowFile = session.write(flowFile, new StreamCallback() {
+                    @Override
+                    public void process(final InputStream in, final 
OutputStream out) throws IOException {
+                        StreamUtils.fillBuffer(in, buffer, false);
+                        final String originalContent = new String(buffer, 0, 
flowFileSize, charset);
+                        final String substitutedContent = 
StringSubstitutor.replace(originalContent, flowFileAttributes);
+                        out.write(substitutedContent.getBytes(charset));
+                    }
+                });
+            } else {
+                flowFile = session.write(flowFile, new 
StreamReplaceCallback(charset, maxBufferSize, 
context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+                        (bw, oneLine) -> {
+                            final String substitutedLine = 
StringSubstitutor.replace(oneLine, flowFileAttributes);
+                            bw.write(substitutedLine);
+                        }));
+            }
+            return flowFile;
+        }
+
+        @Override
+        public boolean isAllDataBufferedForEntireText() {
+            return true;
+        }
+    }
+
     /**
      * If we have a '$' followed by anything other than a number, then escape
      * it. E.g., '$d' becomes '\$d' so that it can be used as a literal in a
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
index 341b75e..88ed097 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
@@ -1641,6 +1641,50 @@ public class TestReplaceText {
         out.assertContentEquals("WO$1R$2D");
     }
 
+    @Test
+    public void testSubstituteVariablesWithEvaluationModeEntireText() {
+        testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: 
foo\nLine2: bar", ReplaceText.ENTIRE_TEXT, createAttributesMap());
+    }
+
+    @Test
+    public void testSubstituteVariablesWithEvaluationModeLineByLine() {
+        testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: 
foo\nLine2: bar", ReplaceText.LINE_BY_LINE, createAttributesMap());
+    }
+
+    @Test
+    public void testSubstituteVariablesWhenVariableValueMissing() {
+        testSubstituteVariables("Line1: ${var1}\nLine2: ${var2}", "Line1: 
${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, Collections.emptyMap());
+    }
+
+    @Test
+    public void testSubstituteVariablesWhenVariableReferenceEscaped() {
+        testSubstituteVariables("Line1: $${var1}\nLine2: $${var2}", "Line1: 
${var1}\nLine2: ${var2}", ReplaceText.ENTIRE_TEXT, createAttributesMap());
+    }
+
+    @Test
+    public void testSubstituteVariablesWhenVariableNameEmpty() {
+        testSubstituteVariables("Line1: ${}\nLine2: ${}", "Line1: ${}\nLine2: 
${}", ReplaceText.ENTIRE_TEXT, createAttributesMap());
+    }
+
+    private void testSubstituteVariables(String inputContent, String 
expectedContent, String evaluationMode, Map<String, String> attributesMap) {
+        final TestRunner runner = getRunner();
+        runner.setProperty(ReplaceText.EVALUATION_MODE, evaluationMode);
+        runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, 
ReplaceText.SUBSTITUTE_VARIABLES.getValue());
+        runner.enqueue(inputContent, attributesMap);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+        out.assertContentEquals(expectedContent);
+    }
+
+    private Map<String, String> createAttributesMap() {
+        final Map<String, String> attributesMap = new HashMap<>();
+        attributesMap.put("var1", "foo");
+        attributesMap.put("var2", "bar");
+        return attributesMap;
+    }
+
     /*
      * A repeated alternation regex such as (A|B)* can lead to 
StackOverflowError
      * on large input strings.

Reply via email to