Repository: nifi
Updated Branches:
  refs/heads/master 0ad30e188 -> 692943f01


NIFI-5454: Added EL support and copy.index attribute to DuplicateFlowFile

Signed-off-by: Pierre Villard <[email protected]>

This closes #2917.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/692943f0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/692943f0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/692943f0

Branch: refs/heads/master
Commit: 692943f016b19a5b8ff915c5921194081418e9ea
Parents: 0ad30e1
Author: Matthew Burgess <[email protected]>
Authored: Wed Jul 25 13:55:10 2018 -0400
Committer: Pierre Villard <[email protected]>
Committed: Thu Jul 26 14:13:14 2018 +0200

----------------------------------------------------------------------
 .../processors/standard/DuplicateFlowFile.java  | 23 ++++++++++++----
 .../standard/TestDuplicateFlowFile.java         | 28 +++++++++++++++++++-
 2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/692943f0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
index ecc8e60..4ba5f83 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
@@ -24,9 +24,12 @@ import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -39,14 +42,22 @@ import org.apache.nifi.processor.util.StandardValidators;
 @SupportsBatching
 @Tags({"test", "load", "duplicate"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Intended for load testing, this processor will create 
the configured number of copies of each incoming FlowFile")
+@CapabilityDescription("Intended for load testing, this processor will create 
the configured number of copies of each incoming FlowFile. The original 
FlowFile as well as all "
++ "generated copies are sent to the 'success' relationship. In addition, each 
FlowFile gets an attribute 'copy.index' set to the copy number, where the 
original FlowFile gets "
++ "a value of zero, and all copies receive incremented integer values.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "copy.index", description = "A zero-based 
incrementing integer value based on which copy the FlowFile is.")
+})
 public class DuplicateFlowFile extends AbstractProcessor {
 
+    public static final String COPY_INDEX_ATTRIBUTE = "copy.index";
+
     static final PropertyDescriptor NUM_COPIES = new 
PropertyDescriptor.Builder()
     .name("Number of Copies")
+    .displayName("Number of Copies")
     .description("Specifies how many copies of each incoming FlowFile will be 
made")
     .required(true)
-    .expressionLanguageSupported(false)
+    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
     .defaultValue("100")
     .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
     .build();
@@ -68,16 +79,18 @@ public class DuplicateFlowFile extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final FlowFile flowFile = session.get();
+        FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
 
-        for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) {
-            final FlowFile copy = session.clone(flowFile);
+        for (int i = 1; i <= 
context.getProperty(NUM_COPIES).evaluateAttributeExpressions(flowFile).asInteger();
 i++) {
+            FlowFile copy = session.clone(flowFile);
+            copy = session.putAttribute(copy, COPY_INDEX_ATTRIBUTE, 
Integer.toString(i));
             session.transfer(copy, REL_SUCCESS);
         }
 
+        flowFile = session.putAttribute(flowFile, COPY_INDEX_ATTRIBUTE, "0");
         session.transfer(flowFile, REL_SUCCESS);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/692943f0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
index 76026bc..8a0b2ef 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java
@@ -16,20 +16,46 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.List;
+
+import static 
org.apache.nifi.processors.standard.DuplicateFlowFile.COPY_INDEX_ATTRIBUTE;
+
 public class TestDuplicateFlowFile {
 
     @Test
     public void test() {
+        final int numCopies = 100;
         final TestRunner runner = 
TestRunners.newTestRunner(DuplicateFlowFile.class);
-        runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100");
+        runner.setProperty(DuplicateFlowFile.NUM_COPIES, 
Integer.toString(numCopies));
 
         runner.enqueue("hello".getBytes());
         runner.run();
 
+        runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 
numCopies + 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(DuplicateFlowFile.REL_SUCCESS);
+        // copy.index starts with 1, original has copy.index = 0 but is 
transferred last
+        for (int i = 1; i <= numCopies; i++) {
+            flowFiles.get(i - 1).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, 
Integer.toString(i));
+        }
+        flowFiles.get(numCopies).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, 
"0");
+    }
+
+    @Test
+    public void testNumberOfCopiesEL() {
+        final TestRunner runner = 
TestRunners.newTestRunner(DuplicateFlowFile.class);
+        runner.setProperty(DuplicateFlowFile.NUM_COPIES, "${num.copies}");
+
+        runner.enqueue("hello".getBytes(), new HashMap<String, String>() {{
+            put("num.copies", "100");
+        }});
+        runner.run();
+
         runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 
101);
     }
 

Reply via email to