Repository: nifi Updated Branches: refs/heads/master 4700b8653 -> 0a44bad76
NIFI-5168 - ReplaceText Processor Should Use Single FlowFile Processing Instead of Batch Signed-off-by: Pierre Villard <[email protected]> This closes #2687. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0a44bad7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0a44bad7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0a44bad7 Branch: refs/heads/master Commit: 0a44bad76e6db2c163f6d6a78bbaf8184cdce7f7 Parents: 4700b86 Author: patricker <[email protected]> Authored: Tue May 8 15:52:46 2018 +0800 Committer: Pierre Villard <[email protected]> Committed: Wed May 9 18:46:35 2018 +0200 ---------------------------------------------------------------------- .../nifi/processors/standard/ReplaceText.java | 27 +++++++++----------- 1 file changed, 12 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0a44bad7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index 6cc9197..de17213 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -45,7 +45,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processor.util.FlowFileFilters; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.NLKBufferedReader; import org.apache.nifi.stream.io.StreamUtils; @@ -236,8 +235,8 @@ public class ReplaceText extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 100)); - if (flowFiles.isEmpty()) { + FlowFile flowFile = session.get(); + if (flowFile == null) { return; } @@ -283,22 +282,20 @@ public class ReplaceText extends AbstractProcessor { throw new AssertionError(); } - for (FlowFile flowFile : flowFiles) { - if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { - if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) { - session.transfer(flowFile, REL_FAILURE); - continue; - } + if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { + if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) { + session.transfer(flowFile, REL_FAILURE); + return; } + } - final StopWatch stopWatch = new StopWatch(true); + final StopWatch stopWatch = new StopWatch(true); - flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize); + flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize); - logger.info("Transferred {} to 'success'", new Object[] {flowFile}); - session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } + logger.info("Transferred {} to 'success'", new Object[] {flowFile}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); }
