Repository: nifi
Updated Branches:
  refs/heads/master 4700b8653 -> 0a44bad76


NIFI-5168 - ReplaceText Processor Should Use Single FlowFile Processing Instead 
of Batch

Signed-off-by: Pierre Villard <[email protected]>

This closes #2687.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0a44bad7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0a44bad7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0a44bad7

Branch: refs/heads/master
Commit: 0a44bad76e6db2c163f6d6a78bbaf8184cdce7f7
Parents: 4700b86
Author: patricker <[email protected]>
Authored: Tue May 8 15:52:46 2018 +0800
Committer: Pierre Villard <[email protected]>
Committed: Wed May 9 18:46:35 2018 +0200

----------------------------------------------------------------------
 .../nifi/processors/standard/ReplaceText.java   | 27 +++++++++-----------
 1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0a44bad7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 6cc9197..de17213 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -45,7 +45,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.NLKBufferedReader;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -236,8 +235,8 @@ public class ReplaceText extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 100));
-        if (flowFiles.isEmpty()) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
             return;
         }
 
@@ -283,22 +282,20 @@ public class ReplaceText extends AbstractProcessor {
                 throw new AssertionError();
         }
 
-        for (FlowFile flowFile : flowFiles) {
-            if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
-                if (flowFile.getSize() > maxBufferSize && 
replacementStrategyExecutor.isAllDataBufferedForEntireText()) {
-                    session.transfer(flowFile, REL_FAILURE);
-                    continue;
-                }
+        if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+            if (flowFile.getSize() > maxBufferSize && 
replacementStrategyExecutor.isAllDataBufferedForEntireText()) {
+                session.transfer(flowFile, REL_FAILURE);
+                return;
             }
+        }
 
-            final StopWatch stopWatch = new StopWatch(true);
+        final StopWatch stopWatch = new StopWatch(true);
 
-            flowFile = replacementStrategyExecutor.replace(flowFile, session, 
context, evaluateMode, charset, maxBufferSize);
+        flowFile = replacementStrategyExecutor.replace(flowFile, session, 
context, evaluateMode, charset, maxBufferSize);
 
-            logger.info("Transferred {} to 'success'", new Object[] 
{flowFile});
-            session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, REL_SUCCESS);
-        }
+        logger.info("Transferred {} to 'success'", new Object[] {flowFile});
+        session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        session.transfer(flowFile, REL_SUCCESS);
     }
 
 

Reply via email to