This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d5c79fdcd1 NIFI-10887: Addressed performance concerned. Use
String.indexOf() instead of Pattern.matcher() when using Literal Replace. Use a
NonFlushableOutputStream when ProcessSession.write() is called. Implemented
hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText
from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both
prepending and appending text. Updated unit tests to account for this.
d5c79fdcd1 is described below
commit d5c79fdcd1c806f6376c101f392e333c4a86b805
Author: Mark Payne <[email protected]>
AuthorDate: Mon Nov 28 13:37:53 2022 -0500
NIFI-10887: Addressed performance concerned. Use String.indexOf() instead
of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream
when ProcessSession.write() is called. Implemented hashCode() on
AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25
ms. Added a Surround Replacement strategy that allows both prepending and
appending text. Updated unit tests to account for this.
Signed-off-by: Matthew Burgess <[email protected]>
This closes #6724
---
.../repository/StandardProcessSession.java | 9 ++-
.../service/StandardConfigurationContext.java | 13 ++-
.../controller/queue/AbstractFlowFileQueue.java | 5 ++
.../nifi/processors/standard/ReplaceText.java | 92 +++++++++++++++++-----
.../nifi/processors/standard/TestReplaceText.java | 35 ++++++++
5 files changed, 126 insertions(+), 28 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 43b8155f4e..9234d20edf 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2989,7 +2989,8 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
ensureNotAppending(newClaim);
final OutputStream rawStream = claimCache.write(newClaim);
- final OutputStream disableOnClose = new
DisableOnCloseOutputStream(rawStream);
+ final OutputStream nonFlushable = new
NonFlushableOutputStream(rawStream);
+ final OutputStream disableOnClose = new
DisableOnCloseOutputStream(nonFlushable);
final ByteCountingOutputStream countingOut = new
ByteCountingOutputStream(disableOnClose);
final FlowFile sourceFlowFile = source;
@@ -3125,7 +3126,8 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
ensureNotAppending(newClaim);
try (final OutputStream stream = claimCache.write(newClaim);
- final OutputStream disableOnClose = new
DisableOnCloseOutputStream(stream);
+ final NonFlushableOutputStream nonFlushableOutputStream = new
NonFlushableOutputStream(stream);
+ final OutputStream disableOnClose = new
DisableOnCloseOutputStream(nonFlushableOutputStream);
final ByteCountingOutputStream countingOut = new
ByteCountingOutputStream(disableOnClose)) {
try {
writeRecursionSet.add(source);
@@ -3417,7 +3419,8 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
final InputStream disableOnCloseIn = new
DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingIn = new
ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream os = claimCache.write(newClaim);
- final OutputStream disableOnCloseOut = new
DisableOnCloseOutputStream(os);
+ final OutputStream nonFlushableOut = new
NonFlushableOutputStream(os);
+ final OutputStream disableOnCloseOut = new
DisableOnCloseOutputStream(nonFlushableOut);
final ByteCountingOutputStream countingOut = new
ByteCountingOutputStream(disableOnCloseOut)) {
writeRecursionSet.add(source);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
index 1a2e63eca8..8298c5c3dc 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
@@ -116,10 +116,15 @@ public class StandardConfigurationContext implements
ConfigurationContext {
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);
- // We need to get the 'canonical representation' of the property
descriptor from the component itself,
- // since the supplied PropertyDescriptor may not have the proper
default value.
- final PropertyDescriptor resolvedDescriptor =
component.getPropertyDescriptor(property.getName());
- final String resolvedValue = (configuredValue == null) ?
resolvedDescriptor.getDefaultValue() : configuredValue;
+ final String resolvedValue;
+ if (configuredValue == null) {
+ // We need to get the 'canonical representation' of the property
descriptor from the component itself,
+ // since the supplied PropertyDescriptor may not have the proper
default value.
+ final PropertyDescriptor resolvedDescriptor =
component.getPropertyDescriptor(property.getName());
+ resolvedValue = resolvedDescriptor.getDefaultValue();
+ } else {
+ resolvedValue = configuredValue;
+ }
final ResourceContext resourceContext = new
StandardResourceContext(new StandardResourceReferenceFactory(), property);
return new StandardPropertyValue(resourceContext, resolvedValue,
serviceLookup, component.getParameterLookup(), preparedQueries.get(property),
variableRegistry);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index 609734a36a..e4038603f8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -500,4 +500,9 @@ public abstract class AbstractFlowFileQueue implements
FlowFileQueue {
public List<FlowFileRecord> poll(FlowFileFilter filter,
Set<FlowFileRecord> expiredRecords) {
return poll(filter, expiredRecords,
PollStrategy.UNPENALIZED_FLOWFILES);
}
+
+ @Override
+ public int hashCode() {
+ return identifier.hashCode();
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 2b1effbc73..177eddab0d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringSubstitutor;
+import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -74,11 +75,11 @@ import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
-@SupportsBatching
+@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify",
"Regex"})
-@CapabilityDescription("Updates the content of a FlowFile by evaluating a
Regular Expression (regex) against it and replacing the section of "
- + "the content that matches the Regular Expression with some alternate
value.")
+@CapabilityDescription("Updates the content of a FlowFile by searching for
some textual value in the FlowFile content (via Regular Expression/regex, or
literal value) and replacing the " +
+ "section of the content that matches with some alternate value. It can
also be used to append or prepend text to the contents of a FlowFile.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class ReplaceText extends AbstractProcessor {
@@ -94,6 +95,7 @@ public class ReplaceText extends AbstractProcessor {
public static final String ENTIRE_TEXT = "Entire text";
public static final String prependValue = "Prepend";
public static final String appendValue = "Append";
+ public static final String surroundValue = "Surround";
public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace";
public static final String alwaysReplace = "Always Replace";
@@ -114,6 +116,9 @@ public class ReplaceText extends AbstractProcessor {
+ "the value will be appended to each line. Similarly, for
\"First-Line\", \"Last-Line\", \"Except-Last-Line\" and \"Except-First-Line\"
Evaluation Modes,"
+ "the value will be appended to header alone, footer alone, all
lines except header and all lines except footer respectively. For \"Entire
Text\" evaluation mode,"
+ "the value will be appended to the entire text.");
+ static final AllowableValue SURROUND = new AllowableValue(surroundValue,
surroundValue,
+ "Prepends text before the start of the FlowFile (or the start of each
line, depending on the configuration of the Evaluation Mode property) " +
+ "as well as appending text to the end of the FlowFile (or the end
of each line, depending on the configuration of the Evaluation Mode property)");
static final AllowableValue LITERAL_REPLACE = new
AllowableValue(literalReplaceValue, literalReplaceValue,
"Search for all instances of the Search Value and replace the matches
with the Replacement Value.");
static final AllowableValue REGEX_REPLACE = new
AllowableValue(regexReplaceValue, regexReplaceValue,
@@ -127,6 +132,14 @@ public class ReplaceText extends AbstractProcessor {
"Substitute variable references (specified in ${var} form) using
FlowFile attributes for looking up the replacement value by variable name. "
+ "When this strategy is chosen, both the <Search Value>
and <Replacement Value> properties are ignored.");
+
+ public static final PropertyDescriptor REPLACEMENT_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Replacement Strategy")
+ .description("The strategy for how and what to replace within the
FlowFile's text content.")
+ .allowableValues(PREPEND, APPEND, SURROUND, REGEX_REPLACE,
LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
+ .defaultValue(REGEX_REPLACE.getValue())
+ .required(true)
+ .build();
public static final PropertyDescriptor SEARCH_VALUE = new
PropertyDescriptor.Builder()
.name("Regular Expression")
.displayName("Search Value")
@@ -134,6 +147,7 @@ public class ReplaceText extends AbstractProcessor {
.required(true)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE)
.defaultValue(DEFAULT_REGEX)
.build();
public static final PropertyDescriptor REPLACEMENT_VALUE = new
PropertyDescriptor.Builder()
@@ -145,6 +159,25 @@ public class ReplaceText extends AbstractProcessor {
.required(true)
.defaultValue(DEFAULT_REPLACEMENT_VALUE)
.addValidator(Validator.VALID)
+ .dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE,
ALWAYS_REPLACE, PREPEND, APPEND)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ static final PropertyDescriptor PREPEND_TEXT = new
PropertyDescriptor.Builder()
+ .name("Text to Prepend")
+ .displayName("Text to Prepend")
+ .description("The text to prepend to the start of the FlowFile, or
each line, depending on teh configured value of the Evaluation Mode property")
+ .required(true)
+ .addValidator(Validator.VALID)
+ .dependsOn(REPLACEMENT_STRATEGY, SURROUND)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ static final PropertyDescriptor APPEND_TEXT = new
PropertyDescriptor.Builder()
+ .name("Text to Append")
+ .displayName("Text to Append")
+ .description("The text to append to the end of the FlowFile, or each
line, depending on teh configured value of the Evaluation Mode property")
+ .required(true)
+ .addValidator(Validator.VALID)
+ .dependsOn(REPLACEMENT_STRATEGY, SURROUND)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHARACTER_SET = new
PropertyDescriptor.Builder()
@@ -166,13 +199,6 @@ public class ReplaceText extends AbstractProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
- public static final PropertyDescriptor REPLACEMENT_STRATEGY = new
PropertyDescriptor.Builder()
- .name("Replacement Strategy")
- .description("The strategy for how and what to replace within the
FlowFile's text content.")
- .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE,
ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
- .defaultValue(REGEX_REPLACE.getValue())
- .required(true)
- .build();
public static final PropertyDescriptor EVALUATION_MODE = new
PropertyDescriptor.Builder()
.name("Evaluation Mode")
.description("Run the 'Replacement Strategy' against each line
separately (Line-by-Line) or buffer the entire file "
@@ -191,6 +217,8 @@ public class ReplaceText extends AbstractProcessor {
.required(false)
.build();
+
+
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -209,11 +237,13 @@ public class ReplaceText extends AbstractProcessor {
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(REPLACEMENT_STRATEGY);
properties.add(SEARCH_VALUE);
properties.add(REPLACEMENT_VALUE);
+ properties.add(PREPEND_TEXT);
+ properties.add(APPEND_TEXT);
properties.add(CHARACTER_SET);
properties.add(MAX_BUFFER_SIZE);
- properties.add(REPLACEMENT_STRATEGY);
properties.add(EVALUATION_MODE);
properties.add(LINE_BY_LINE_EVALUATION_MODE);
this.properties = Collections.unmodifiableList(properties);
@@ -271,7 +301,10 @@ public class ReplaceText extends AbstractProcessor {
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
- replacementStrategyExecutor = new AppendReplace();
+ replacementStrategyExecutor = new SurroundReplace(null,
REPLACEMENT_VALUE);
+ break;
+ case surroundValue:
+ replacementStrategyExecutor = new
SurroundReplace(PREPEND_TEXT, APPEND_TEXT);
break;
case regexReplaceValue:
// for backward compatibility - if replacement regex is ".*"
then we will simply always replace the content.
@@ -454,23 +487,39 @@ public class ReplaceText extends AbstractProcessor {
}
- private static class AppendReplace implements ReplacementStrategyExecutor {
+ private static class SurroundReplace implements
ReplacementStrategyExecutor {
+ private final PropertyDescriptor prependValueDescriptor;
+ private final PropertyDescriptor appendValueDescriptor;
+
+ public SurroundReplace(final PropertyDescriptor
prependValueDescriptor, final PropertyDescriptor appendValueDescriptor) {
+ this.prependValueDescriptor = prependValueDescriptor;
+ this.appendValueDescriptor = appendValueDescriptor;
+ }
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession
session, final ProcessContext context, final String evaluateMode, final Charset
charset, final int maxBufferSize) {
- final String replacementValue =
context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+ final String prependValue = (prependValueDescriptor == null) ?
null :
context.getProperty(prependValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();
+ final String appendValue =
context.getProperty(appendValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final
OutputStream out) throws IOException {
+ if (prependValue != null && !prependValue.isEmpty()) {
+ out.write(prependValue.getBytes(charset));
+ }
+
IOUtils.copy(in, out);
- out.write(replacementValue.getBytes(charset));
+ out.write(appendValue.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new
StreamReplaceCallback(charset, maxBufferSize,
context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
+ if (prependValue != null && !prependValue.isEmpty()) {
+ bw.write(prependValue);
+ }
+
// we need to find the first carriage return or
new-line so that we can append the new value
// before the line separate. However, we don't want to
do this using a regular expression due
// to performance concerns. So we will find the first
occurrence of either \r or \n and use
@@ -484,7 +533,7 @@ public class ReplaceText extends AbstractProcessor {
}
if (c == '\r' || c == '\n') {
- bw.write(replacementValue);
+ bw.write(appendValue);
foundNewLine = true;
}
@@ -492,7 +541,7 @@ public class ReplaceText extends AbstractProcessor {
}
if (!foundNewLine) {
- bw.write(replacementValue);
+ bw.write(appendValue);
}
}));
}
@@ -641,13 +690,14 @@ public class ReplaceText extends AbstractProcessor {
int matches = 0;
int lastEnd = 0;
- final Matcher matcher = searchPattern.matcher(oneLine);
- while (matcher.find()) {
- bw.write(oneLine, lastEnd, matcher.start() -
lastEnd);
+ int index = oneLine.indexOf(searchValue, lastEnd);
+ while (index >= 0) {
+ bw.write(oneLine, lastEnd, index - lastEnd);
bw.write(replacementValue);
matches++;
- lastEnd = matcher.end();
+ lastEnd = index + searchValue.length();
+ index = oneLine.indexOf(searchValue, lastEnd);
}
if (matches > 0) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
index 1b5b6e53e5..9023b37835 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
@@ -1631,6 +1631,41 @@ public class TestReplaceText {
runner.assertValid();
}
+ @Test
+ public void testSurroundWithEntireText() {
+ final TestRunner runner = getRunner();
+ runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY,
ReplaceText.SURROUND);
+ runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
+ runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
+ runner.setProperty(ReplaceText.EVALUATION_MODE,
ReplaceText.ENTIRE_TEXT);
+
+ final String input = "Hello\nThere\nHow are you\nToday?";
+ runner.enqueue(input);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+
+ final MockFlowFile output =
runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+ output.assertContentEquals("<pre>" + input + "<post>");
+ }
+
+ @Test
+ public void testSurroundLineByLine() {
+ final TestRunner runner = getRunner();
+ runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY,
ReplaceText.SURROUND);
+ runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
+ runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
+ runner.setProperty(ReplaceText.EVALUATION_MODE,
ReplaceText.LINE_BY_LINE);
+
+ final String input = "Hello\nThere\nHow are you\nToday?";
+ runner.enqueue(input);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+
+ final MockFlowFile output =
runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+
output.assertContentEquals("<pre>Hello<post>\n<pre>There<post>\n<pre>How are
you<post>\n<pre>Today?<post>");
+ }
+
+
@Test
public void testBackReferenceEscapeWithRegexReplaceUsingEL() {
final TestRunner runner = getRunner();