Repository: nifi Updated Branches: refs/heads/master 0ad30e188 -> 692943f01
NIFI-5454: Added EL support and copy.index attribute to DuplicateFlowFile Signed-off-by: Pierre Villard <[email protected]> This closes #2917. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/692943f0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/692943f0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/692943f0 Branch: refs/heads/master Commit: 692943f016b19a5b8ff915c5921194081418e9ea Parents: 0ad30e1 Author: Matthew Burgess <[email protected]> Authored: Wed Jul 25 13:55:10 2018 -0400 Committer: Pierre Villard <[email protected]> Committed: Thu Jul 26 14:13:14 2018 +0200 ---------------------------------------------------------------------- .../processors/standard/DuplicateFlowFile.java | 23 ++++++++++++---- .../standard/TestDuplicateFlowFile.java | 28 +++++++++++++++++++- 2 files changed, 45 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/692943f0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java index ecc8e60..4ba5f83 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java @@ -24,9 +24,12 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -39,14 +42,22 @@ import org.apache.nifi.processor.util.StandardValidators; @SupportsBatching @Tags({"test", "load", "duplicate"}) @InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile") +@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile. The original FlowFile as well as all " ++ "generated copies are sent to the 'success' relationship. In addition, each FlowFile gets an attribute 'copy.index' set to the copy number, where the original FlowFile gets " ++ "a value of zero, and all copies receive incremented integer values.") +@WritesAttributes({ + @WritesAttribute(attribute = "copy.index", description = "A zero-based incrementing integer value based on which copy the FlowFile is.") +}) public class DuplicateFlowFile extends AbstractProcessor { + public static final String COPY_INDEX_ATTRIBUTE = "copy.index"; + static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder() .name("Number of Copies") + .displayName("Number of Copies") .description("Specifies how many copies of each incoming FlowFile will be made") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("100") .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); @@ -68,16 +79,18 @@ public class DuplicateFlowFile extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFile flowFile = session.get(); + FlowFile flowFile = session.get(); if (flowFile == null) { return; } - for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) { - final FlowFile copy = session.clone(flowFile); + for (int i = 1; i <= context.getProperty(NUM_COPIES).evaluateAttributeExpressions(flowFile).asInteger(); i++) { + FlowFile copy = session.clone(flowFile); + copy = session.putAttribute(copy, COPY_INDEX_ATTRIBUTE, Integer.toString(i)); session.transfer(copy, REL_SUCCESS); } + flowFile = session.putAttribute(flowFile, COPY_INDEX_ATTRIBUTE, "0"); session.transfer(flowFile, REL_SUCCESS); } http://git-wip-us.apache.org/repos/asf/nifi/blob/692943f0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java index 76026bc..8a0b2ef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java @@ -16,20 +16,46 @@ */ package org.apache.nifi.processors.standard; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import java.util.HashMap; +import java.util.List; + +import static org.apache.nifi.processors.standard.DuplicateFlowFile.COPY_INDEX_ATTRIBUTE; + public class TestDuplicateFlowFile { @Test public void test() { + final int numCopies = 100; final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); - runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100"); + runner.setProperty(DuplicateFlowFile.NUM_COPIES, Integer.toString(numCopies)); runner.enqueue("hello".getBytes()); runner.run(); + runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, numCopies + 1); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DuplicateFlowFile.REL_SUCCESS); + // copy.index starts with 1, original has copy.index = 0 but is transferred last + for (int i = 1; i <= numCopies; i++) { + flowFiles.get(i - 1).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, Integer.toString(i)); + } + flowFiles.get(numCopies).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, "0"); + } + + @Test + public void testNumberOfCopiesEL() { + final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); + runner.setProperty(DuplicateFlowFile.NUM_COPIES, "${num.copies}"); + + runner.enqueue("hello".getBytes(), new HashMap<String, String>() {{ + put("num.copies", "100"); + }}); + runner.run(); + runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 101); }
