NIFI-190 merged/resolved conflicts of 3 month old submission. Still needs a thorough review
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6061475e Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6061475e Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6061475e Branch: refs/heads/NIFI-190 Commit: 6061475edecb80e1f132614ffb5fbbe45faad9ed Parents: 8e1e355 Author: joewitt <[email protected]> Authored: Tue Mar 17 00:35:43 2015 -0400 Committer: joewitt <[email protected]> Committed: Tue Mar 17 00:35:43 2015 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/HoldFile.java | 285 ------------------- .../index.html | 158 ---------- .../nifi/processors/standard/TestHoldFile.java | 155 ---------- .../nifi/processors/standard/HoldFile.java | 285 +++++++++++++++++++ .../index.html | 158 ++++++++++ .../nifi/processors/standard/TestHoldFile.java | 156 ++++++++++ 6 files changed, 599 insertions(+), 598 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java deleted file mode 100644 index b714a07..0000000 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java +++ /dev/null @@ -1,285 +0,0 @@ -package org.apache.nifi.processors.standard; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; - -@Tags({"hold", "release", "signal"}) -@CapabilityDescription("Holds incoming flow files until a matching signal flow file enters the processor. " - + "Incoming files are classified as either held files or signals. " - + "Held files are routed to the Held relationship until a matching signal has been received.") -public class HoldFile extends AbstractProcessor { - public static final String FLOW_FILE_RELEASE_VALUE = "flow.file.release.value"; - public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); - - public static final PropertyDescriptor MAX_SIGNAL_AGE = new PropertyDescriptor - .Builder().name("Max Signal Age") - .description("The maximum age of a signal that will trigger a file to be released. " - + "Expected format is <duration> <time unit> where <duration> is a positive " - + "integer and <time unit> is one of seconds, minutes, hours") - .required(true).defaultValue("24 hours") - .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) - .build(); - public static final PropertyDescriptor RELEASE_SIGNAL_ATTRIBUTE = new PropertyDescriptor - .Builder().name("Release Signal Attribute") - .description("The flow file attribute name on held files that will be checked against values in the signal cache.") - .required(true). - addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) - .build(); - public static final PropertyDescriptor COPY_SIGNAL_ATTRIBUTES = new PropertyDescriptor - .Builder().name("Copy Signal Attributes?") - .description("If true, a signal's flow file attributes will be copied to its matching held files, " - + "with the exception of flow.file.release.value and the configured Signal Failure Attribute") - .required(true) - .defaultValue("true") - .allowableValues("true", "false") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); - public static final PropertyDescriptor FAILURE_ATTRIBUTE = new PropertyDescriptor - .Builder().name("Signal Failure Attribute") - .description("Signals that have this attribute set to 'true' " - + "will cause matching held flow files to route to Failure. If this attribute " - + "is not populated, it is assumed that the flow file succeeds.") - .required(false) - .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) - .build(); - - public static final Relationship REL_HOLD = new Relationship.Builder() - .name("hold").description("Held files whose signals have not been received are routed here").build(); - - public static final Relationship REL_RELEASE = new Relationship.Builder() - .name("release").description("Held files whose signals have been received are routed here").build(); - - public static final Relationship REL_EXPIRED = new Relationship.Builder() - .name("expired").description("Held files that expire are routed here").build(); - - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure").description("Held files whose signal contains the Signal Failure Attribute are " - + "routed here, indicating a processing failure upstream").build(); - - private Set<String> excludedAttributes = new HashSet<>(); - - private Set<Relationship> relationships; - private List<PropertyDescriptor> descriptors; - - private String failureAttribute; - private volatile Map<String, ReleaseAttributes> releaseValues = new ConcurrentHashMap<>(); - private long expirationDuration = 0L; - private String releaseSignalAttribute; - private boolean copyAttributes; - - - @Override - protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> relationships = new HashSet<>(); - relationships .add(REL_HOLD); - relationships .add(REL_RELEASE); - relationships .add(REL_EXPIRED); - relationships .add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(RELEASE_SIGNAL_ATTRIBUTE); - descriptors.add(MAX_SIGNAL_AGE); - descriptors.add(COPY_SIGNAL_ATTRIBUTES); - descriptors.add(FAILURE_ATTRIBUTE); - this.descriptors = Collections.unmodifiableList(descriptors); - } - - @Override - public final Set<Relationship> getRelationships() { - return relationships; - } - - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - this.expirationDuration = context.getProperty(MAX_SIGNAL_AGE) - .asTimePeriod(TimeUnit.MILLISECONDS); - this.releaseSignalAttribute = context.getProperty( - RELEASE_SIGNAL_ATTRIBUTE).getValue(); - this.copyAttributes = context.getProperty( - COPY_SIGNAL_ATTRIBUTES).asBoolean(); - PropertyValue failureAttrValue = context.getProperty(FAILURE_ATTRIBUTE); - if (failureAttrValue != null) { - this.failureAttribute = failureAttrValue.getValue(); - } - - this.excludedAttributes.clear(); - this.excludedAttributes.add(FLOW_FILE_RELEASE_VALUE); - if (this.failureAttribute != null) { - this.excludedAttributes.add(this.failureAttribute); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - int encountered = 0; - FlowFile flowFile = null; - while((flowFile = session.get()) != null) { - String releaseValueAttribute = flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE); - - if ((releaseValueAttribute == null) - || (releaseValueAttribute.trim().length() == 0)) { - this.processHoldFile(flowFile, session); - } else { - this.processSignal(flowFile, session); - } - encountered++; - } - - if (!isScheduled()) { - return; - } - if (encountered == 0) { - context.yield(); - } - } - - /** - * Stores a signal and all associated flow file attributes, as applicable. - * @param flowFile - * @param session - */ - private void processSignal(FlowFile flowFile, ProcessSession session) { - ReleaseAttributes releaseAttributes = new ReleaseAttributes(); - // Store any propagated signal attributes - if (this.copyAttributes) { - releaseAttributes.attributes.putAll(flowFile.getAttributes()); - } - - // Check if the signal indicates a failure upstream - String failureValue = flowFile.getAttribute(failureAttribute); - releaseAttributes.failed = "true".equalsIgnoreCase(failureValue); - String releaseValue = flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE); - releaseValues.put(releaseValue , releaseAttributes); - if (getLogger().isDebugEnabled()) { - getLogger().debug("{} is marking flow files with {}={} for release", - new Object[] {flowFile, releaseSignalAttribute, releaseValue }); - } - - session.remove(flowFile); - } - - /** - * Transfers the held file to either Release (if a release signal has - * been found), Hold (if not), or Expired (if it has expired). - * @param flowFile - * @param session - */ - private void processHoldFile(FlowFile flowFile, ProcessSession session) { - String attributeValue = flowFile.getAttribute(releaseSignalAttribute); - - // Do we have a matching attribute to be released? - if (attributeValue != null && releaseValues.containsKey(attributeValue)) { - ReleaseAttributes releaseAttributes = releaseValues - .get(attributeValue); - boolean copyAttributes = (!releaseAttributes.attributes.isEmpty() && this.copyAttributes); - if (copyAttributes) { - Map<String, String> signalAttributes = getNewAttributes( - flowFile.getAttributes(), releaseAttributes.attributes, - this.excludedAttributes); - flowFile = session.putAllAttributes(flowFile, signalAttributes); - flowFile = session.removeAllAttributes(flowFile, - this.excludedAttributes); - } - if (getLogger().isDebugEnabled()) { - getLogger().debug("{} was released. Attributes copied? {}", - new Object[] { flowFile, copyAttributes }); - } - // Remove the signal - synchronized (releaseValues) { - releaseValues.remove(attributeValue); - if (releaseAttributes.failed) { - getLogger() - .warn("Received a non-success value for {}, routing to failure: {}", - new Object[] { failureAttribute, flowFile }); - session.transfer(flowFile, REL_FAILURE); - } else { - session.transfer(flowFile, REL_RELEASE); - } - } - } else { - // We don't have a signal yet. Let's check if it has expired - long entryDate = flowFile.getEntryDate(); - if (isExpired(entryDate)) { - session.transfer(flowFile, REL_EXPIRED); - // It expired. We likely have some expired signals too, so let's - // check - synchronized (releaseValues) { - for (Iterator<Entry<String, ReleaseAttributes>> it = releaseValues - .entrySet().iterator(); it.hasNext();) { - Entry<String, ReleaseAttributes> entry = it.next(); - ReleaseAttributes releaseAttributes = entry.getValue(); - if (isExpired(releaseAttributes.creationTimestamp)) { - releaseAttributes.attributes.clear(); - it.remove(); - } - } - } - if (getLogger().isDebugEnabled()) { - getLogger().debug("Expiring {}", new Object[] { flowFile }); - } - } else { - // It hasn't expired, so transfer to Hold - session.transfer(flowFile, REL_HOLD); - if (getLogger().isTraceEnabled()) { - getLogger().trace("Holding {}", new Object[] { flowFile }); - } - } - } - } - - // Compares two maps and returns only those attributes that are new - private static Map<String, String> getNewAttributes( - Map<String, String> existingAttributes, - Map<String, String> attributesToAdd, Set<String> alwaysOverrideKeys) { - Map<String, String> newAttributes = new HashMap<>(); - for (Entry<String, String> entryToAdd : attributesToAdd.entrySet()) { - if (!existingAttributes.containsKey(entryToAdd.getKey()) - || alwaysOverrideKeys.contains(entryToAdd.getKey())) { - newAttributes.put(entryToAdd.getKey(), entryToAdd.getValue()); - } - } - return newAttributes; - } - - // Returns true if the given time is far enough in the past to expire - private boolean isExpired(Long creationTimestamp) { - return creationTimestamp + expirationDuration < System.currentTimeMillis(); - } - - public static class ReleaseAttributes { - private long creationTimestamp = System.currentTimeMillis(); - private boolean failed; - private Map<String, String> attributes = new HashMap<>(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html b/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html deleted file mode 100644 index 221c8cb..0000000 --- a/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html +++ /dev/null @@ -1,158 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <head> - <meta charset="utf-8" /> - <title>HoldFile</title> - - - <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> - </head> - - <body> - - <!-- Processor Documentation ================================================== --> - <h2>Description:</h2> - <p>This processor holds incoming flow files until a matching signal - flow file enters the processor. Incoming files are classified as either - held files or signals. Held files are routed to the Held relationship - until a matching signal has been received. - </p> - - <p>This processor primarily supports the following use case: - <ul> - <li>A file in format A needs to be sent to Endpoint A</li> - <li>The same file in format B needs to be sent to Endpoint B, - but should not proceed until A has reached Endpoint A. - This restriction is most common when Endpoint B - requires some output of Endpoint A.</li> - </ul> - </p> - <p>Signal files are distinguished from held files by the presence of the - "flow.file.release.value" attribute on the signal files. When a signal - file enters, its value is cached in the processor. The processor is also - configured with a "Release Signal Attribute". Held files with this - attribute whose value matches a received signal value will be released.</p> - - <p>An example: HoldFile is configured with Release Signal Attribute = "myId". - Its 'hold' relationship routes back onto itself. - <ol> - <li>flowFile 1 { myId : "123" } enters HoldFile. It is routed to the 'hold' relationship.</li> - <li>flowFile 2 { flow.file.release.value : "123" } enters HoldFile. - Flowfile 1 is then routed to 'release'.</li> - </ol> - - <p>Signal flow files will also copy their attributes to matching held files, - unless otherwise indicated. This is what allows the output of - Endpoint A to pass to Endpoint B, above. - </p> - <p> - <strong>Uses Attributes:</strong> - </p> - <table border="1"> - <thead> - <tr> - <th>Attribute Name</th> - <th>Description</th> - </tr> - </thead> - <tbody> - <tr> - <td>flow.file.release.value</td> - <td>Files with this attribute are considered signals. The attribute - value is stored in a cache in the processor, and any held flow files - are released if their Release Signal Attribute (configured in the processor) - matches this value.</td> - </tr> - </tbody> - </table> - - <p> - <strong>Properties:</strong> - </p> - <p>In the list below, the names of required properties appear - in bold. Any other properties (not in bold) are considered optional. - If a property has a default value, it is indicated. If a property - supports the use of the NiFi Expression Language (or simply, - "expression language"), that is also indicated.</p> - <ul> - <li><strong>Release Signal Attribute</strong> - <ul> - <li>The flow file attribute name on held files that will be checked against the signal cache.</li> - <li>Default value: none</li> - <li>Supports expression language: false</li> - </ul> - </li> - <li><strong>Max Signal Age</strong> - <ul> - <li>The maximum age of a signal that will trigger a file to be released. - Signals are expired from the cache after this duration, and - any matching held files will be routed to the 'expired' relationship. - Expected format is <duration> <time unit> where <duration> is a positive - integer and <time unit> is one of seconds, minutes, hours</li> - <li>Default value: 24 hours</li> - <li>Supports expression language: false</li> - </ul></li> - <li><strong>Copy Signal Attributes?</strong> - <ul> - <li>If true, a signal's flow file attributes will be copied to its matching held files, - with the exception of flow.file.release.value and the configured Signal Failure Attribute.</li> - <li>Default value: true</li> - <li>Supports expression language: false</li> - </ul> - </li> - <li>Signal Failure Attribute - <ul> - <li>Signals that have this attribute set to 'true' - will cause matching held flow files to route to Failure. If this attribute - is not populated, it is assumed that the flow file succeeds. This allows - upstream failures to propagate to held files.</li> - <li>Default value: no default</li> - <li>Supports expression language: false</li> - </ul> - </li> - </ul> - - <p> - <strong>Relationships:</strong> - </p> - <ul> - <li>hold - <ul> - <li>Held files whose signals have not been received are routed here. The most common pattern is to - route this relationship back into the processor, and set it on a timer.</li> - </ul> - </li> - <li>release - <ul> - <li>Held files whose signals have been received are routed here.</li> - </ul> - </li> - <li>failure - <ul> - <li>Held files whose signal contains the Signal Failure Attribute are - routed here, indicating a processing failure upstream.</li> - </ul> - </li> - <li>expired - <ul> - <li>Held files that expire are routed here.</li> - </ul> - </li> - </ul> - - </body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java deleted file mode 100644 index f7b2678..0000000 --- a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java +++ /dev/null @@ -1,155 +0,0 @@ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Test; - -public class TestHoldFile { - private TestRunner testRunner; - - @Before - public void init() { - testRunner = TestRunners.newTestRunner(HoldFile.class); - } - - @Test - public void testHoldAndRelease() throws IOException { - testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); - testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); - testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); - testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "true"); - - // One signal file - Map<String, String> signal = new HashMap<String, String>(); - signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); - signal.put("return.value", "a response"); - testRunner.enqueue("signal".getBytes(), signal); - - // Two normal flow files to be held - Map<String, String> metadata = new HashMap<String, String>(); - metadata.put("identifier", "123456"); - - Map<String, String> metadata2 = new HashMap<String, String>(); - metadata2.put("identifier", "1234567"); - - testRunner.enqueue("file1".getBytes(), metadata); - testRunner.enqueue("file2".getBytes(), metadata2); - - testRunner.run(); - - // One was held because it didn't match - testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); - // The matching one was released - testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1); - - // Make sure the propagated attribute from signal was added to the held flow file - MockFlowFile released = testRunner - .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0); - assertEquals("Signal attributes were not copied to held file", - "a response", released.getAttribute("return.value")); - assertNull("flow.file.release.value should not be propagated", released - .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE)); - - // None expired - testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); - - // None failed - testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0); - } - - @Test - public void testHoldAndRelease_noCopy() throws IOException { - testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); - testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); - testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); - testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false"); - - // One signal file - Map<String, String> signal = new HashMap<String, String>(); - signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); - signal.put("return.value", "a response"); - testRunner.enqueue("signal".getBytes(), signal); - - // Two normal flow files to be held - Map<String, String> metadata = new HashMap<String, String>(); - metadata.put("identifier", "123456"); - - Map<String, String> metadata2 = new HashMap<String, String>(); - metadata2.put("identifier", "1234567"); - - testRunner.enqueue("file1".getBytes(), metadata); - testRunner.enqueue("file2".getBytes(), metadata2); - - testRunner.run(); - - // One was held because it didn't match - testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); - // The matching one was released - testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1); - - // Make sure the propagated attribute from signal was added to the held flow file - MockFlowFile released = testRunner - .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0); - assertNull("Attributes were not supposed to be copied", released - .getAttribute("return.value")); - assertNull("flow.file.release.value should not be propagated", released - .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE)); - - // None expired - testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); - - // None failed - testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0); - } - - @Test - public void testHoldAndRelease_failure() throws IOException { - testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); - testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); - testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); - testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false"); - - // One signal file - Map<String, String> signal = new HashMap<String, String>(); - signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); - signal.put("return.value", "a response"); - signal.put("service.failed", "true"); - testRunner.enqueue("signal".getBytes(), signal); - - // Two normal flow files to be held - Map<String, String> metadata = new HashMap<String, String>(); - metadata.put("identifier", "123456"); - - Map<String, String> metadata2 = new HashMap<String, String>(); - metadata2.put("identifier", "1234567"); - - testRunner.enqueue("file1".getBytes(), metadata); - testRunner.enqueue("file2".getBytes(), metadata2); - - testRunner.run(); - - // One was held because it didn't match - testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); - // The matching one was routed to failure because of the - // service.failed attribute - testRunner.assertTransferCount(HoldFile. REL_FAILURE, 1); - - // Make sure the propagated attribute from signal was added to the held flow file - MockFlowFile released = testRunner - .getFlowFilesForRelationship(HoldFile.REL_FAILURE).get(0); - assertNull("Attributes were not supposed to be copied", released - .getAttribute("return.value")); - - // None expired - testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java new file mode 100644 index 0000000..5dbaa06 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java @@ -0,0 +1,285 @@ +package org.apache.nifi.processors.standard; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +@Tags({"hold", "release", "signal"}) +@CapabilityDescription("Holds incoming flow files until a matching signal flow file enters the processor. " + + "Incoming files are classified as either held files or signals. " + + "Held files are routed to the Held relationship until a matching signal has been received.") +public class HoldFile extends AbstractProcessor { + public static final String FLOW_FILE_RELEASE_VALUE = "flow.file.release.value"; + public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + public static final PropertyDescriptor MAX_SIGNAL_AGE = new PropertyDescriptor + .Builder().name("Max Signal Age") + .description("The maximum age of a signal that will trigger a file to be released. " + + "Expected format is <duration> <time unit> where <duration> is a positive " + + "integer and <time unit> is one of seconds, minutes, hours") + .required(true).defaultValue("24 hours") + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .build(); + public static final PropertyDescriptor RELEASE_SIGNAL_ATTRIBUTE = new PropertyDescriptor + .Builder().name("Release Signal Attribute") + .description("The flow file attribute name on held files that will be checked against values in the signal cache.") + .required(true). + addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .build(); + public static final PropertyDescriptor COPY_SIGNAL_ATTRIBUTES = new PropertyDescriptor + .Builder().name("Copy Signal Attributes?") + .description("If true, a signal's flow file attributes will be copied to its matching held files, " + + "with the exception of flow.file.release.value and the configured Signal Failure Attribute") + .required(true) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor FAILURE_ATTRIBUTE = new PropertyDescriptor + .Builder().name("Signal Failure Attribute") + .description("Signals that have this attribute set to 'true' " + + "will cause matching held flow files to route to Failure. If this attribute " + + "is not populated, it is assumed that the flow file succeeds.") + .required(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .build(); + + public static final Relationship REL_HOLD = new Relationship.Builder() + .name("hold").description("Held files whose signals have not been received are routed here").build(); + + public static final Relationship REL_RELEASE = new Relationship.Builder() + .name("release").description("Held files whose signals have been received are routed here").build(); + + public static final Relationship REL_EXPIRED = new Relationship.Builder() + .name("expired").description("Held files that expire are routed here").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure").description("Held files whose signal contains the Signal Failure Attribute are " + + "routed here, indicating a processing failure upstream").build(); + + private final Set<String> excludedAttributes = new HashSet<>(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + private String failureAttribute; + private volatile Map<String, ReleaseAttributes> releaseValues = new ConcurrentHashMap<>(); + private long expirationDuration = 0L; + private String releaseSignalAttribute; + private boolean copyAttributes; + + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships .add(REL_HOLD); + relationships .add(REL_RELEASE); + relationships .add(REL_EXPIRED); + relationships .add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(RELEASE_SIGNAL_ATTRIBUTE); + descriptors.add(MAX_SIGNAL_AGE); + descriptors.add(COPY_SIGNAL_ATTRIBUTES); + descriptors.add(FAILURE_ATTRIBUTE); + this.descriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public final Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + this.expirationDuration = context.getProperty(MAX_SIGNAL_AGE) + .asTimePeriod(TimeUnit.MILLISECONDS); + this.releaseSignalAttribute = context.getProperty( + RELEASE_SIGNAL_ATTRIBUTE).getValue(); + this.copyAttributes = context.getProperty( + COPY_SIGNAL_ATTRIBUTES).asBoolean(); + PropertyValue failureAttrValue = context.getProperty(FAILURE_ATTRIBUTE); + if (failureAttrValue != null) { + this.failureAttribute = failureAttrValue.getValue(); + } + + this.excludedAttributes.clear(); + this.excludedAttributes.add(FLOW_FILE_RELEASE_VALUE); + if (this.failureAttribute != null) { + this.excludedAttributes.add(this.failureAttribute); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + int encountered = 0; + FlowFile flowFile = null; + while((flowFile = session.get()) != null) { + String releaseValueAttribute = flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE); + + if ((releaseValueAttribute == null) + || (releaseValueAttribute.trim().length() == 0)) { + this.processHoldFile(flowFile, session); + } else { + this.processSignal(flowFile, session); + } + encountered++; + } + + if (!isScheduled()) { + return; + } + if (encountered == 0) { + context.yield(); + } + } + + /** + * Stores a signal and all associated flow file attributes, as applicable. + * @param flowFile + * @param session + */ + private void processSignal(FlowFile flowFile, ProcessSession session) { + ReleaseAttributes releaseAttributes = new ReleaseAttributes(); + // Store any propagated signal attributes + if (this.copyAttributes) { + releaseAttributes.attributes.putAll(flowFile.getAttributes()); + } + + // Check if the signal indicates a failure upstream + String failureValue = flowFile.getAttribute(failureAttribute); + releaseAttributes.failed = "true".equalsIgnoreCase(failureValue); + String releaseValue = flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE); + releaseValues.put(releaseValue , releaseAttributes); + if (getLogger().isDebugEnabled()) { + getLogger().debug("{} is marking flow files with {}={} for release", + new Object[] {flowFile, releaseSignalAttribute, releaseValue }); + } + + session.remove(flowFile); + } + + /** + * Transfers the held file to either Release (if a release signal has + * been found), Hold (if not), or Expired (if it has expired). + * @param flowFile + * @param session + */ + private void processHoldFile(FlowFile flowFile, ProcessSession session) { + String attributeValue = flowFile.getAttribute(releaseSignalAttribute); + + // Do we have a matching attribute to be released? + if (attributeValue != null && releaseValues.containsKey(attributeValue)) { + ReleaseAttributes releaseAttributes = releaseValues + .get(attributeValue); + boolean copyAttributes = (!releaseAttributes.attributes.isEmpty() && this.copyAttributes); + if (copyAttributes) { + Map<String, String> signalAttributes = getNewAttributes( + flowFile.getAttributes(), releaseAttributes.attributes, + this.excludedAttributes); + flowFile = session.putAllAttributes(flowFile, signalAttributes); + flowFile = session.removeAllAttributes(flowFile, + this.excludedAttributes); + } + if (getLogger().isDebugEnabled()) { + getLogger().debug("{} was released. Attributes copied? {}", + new Object[] { flowFile, copyAttributes }); + } + // Remove the signal + synchronized (releaseValues) { + releaseValues.remove(attributeValue); + if (releaseAttributes.failed) { + getLogger() + .warn("Received a non-success value for {}, routing to failure: {}", + new Object[] { failureAttribute, flowFile }); + session.transfer(flowFile, REL_FAILURE); + } else { + session.transfer(flowFile, REL_RELEASE); + } + } + } else { + // We don't have a signal yet. Let's check if it has expired + long entryDate = flowFile.getEntryDate(); + if (isExpired(entryDate)) { + session.transfer(flowFile, REL_EXPIRED); + // It expired. We likely have some expired signals too, so let's + // check + synchronized (releaseValues) { + for (Iterator<Entry<String, ReleaseAttributes>> it = releaseValues + .entrySet().iterator(); it.hasNext();) { + Entry<String, ReleaseAttributes> entry = it.next(); + ReleaseAttributes releaseAttributes = entry.getValue(); + if (isExpired(releaseAttributes.creationTimestamp)) { + releaseAttributes.attributes.clear(); + it.remove(); + } + } + } + if (getLogger().isDebugEnabled()) { + getLogger().debug("Expiring {}", new Object[] { flowFile }); + } + } else { + // It hasn't expired, so transfer to Hold + session.transfer(flowFile, REL_HOLD); + if (getLogger().isTraceEnabled()) { + getLogger().trace("Holding {}", new Object[] { flowFile }); + } + } + } + } + + // Compares two maps and returns only those attributes that are new + private static Map<String, String> getNewAttributes( + Map<String, String> existingAttributes, + Map<String, String> attributesToAdd, Set<String> alwaysOverrideKeys) { + Map<String, String> newAttributes = new HashMap<>(); + for (Entry<String, String> entryToAdd : attributesToAdd.entrySet()) { + if (!existingAttributes.containsKey(entryToAdd.getKey()) + || alwaysOverrideKeys.contains(entryToAdd.getKey())) { + newAttributes.put(entryToAdd.getKey(), entryToAdd.getValue()); + } + } + return newAttributes; + } + + // Returns true if the given time is far enough in the past to expire + private boolean isExpired(Long creationTimestamp) { + return creationTimestamp + expirationDuration < System.currentTimeMillis(); + } + + public static class ReleaseAttributes { + private long creationTimestamp = System.currentTimeMillis(); + private boolean failed; + private Map<String, String> attributes = new HashMap<>(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html new file mode 100644 index 0000000..221c8cb --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html @@ -0,0 +1,158 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>HoldFile</title> + + + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p>This processor holds incoming flow files until a matching signal + flow file enters the processor. Incoming files are classified as either + held files or signals. Held files are routed to the Held relationship + until a matching signal has been received. + </p> + + <p>This processor primarily supports the following use case: + <ul> + <li>A file in format A needs to be sent to Endpoint A</li> + <li>The same file in format B needs to be sent to Endpoint B, + but should not proceed until A has reached Endpoint A. + This restriction is most common when Endpoint B + requires some output of Endpoint A.</li> + </ul> + </p> + <p>Signal files are distinguished from held files by the presence of the + "flow.file.release.value" attribute on the signal files. When a signal + file enters, its value is cached in the processor. The processor is also + configured with a "Release Signal Attribute". Held files with this + attribute whose value matches a received signal value will be released.</p> + + <p>An example: HoldFile is configured with Release Signal Attribute = "myId". + Its 'hold' relationship routes back onto itself. + <ol> + <li>flowFile 1 { myId : "123" } enters HoldFile. It is routed to the 'hold' relationship.</li> + <li>flowFile 2 { flow.file.release.value : "123" } enters HoldFile. + Flowfile 1 is then routed to 'release'.</li> + </ol> + + <p>Signal flow files will also copy their attributes to matching held files, + unless otherwise indicated. This is what allows the output of + Endpoint A to pass to Endpoint B, above. + </p> + <p> + <strong>Uses Attributes:</strong> + </p> + <table border="1"> + <thead> + <tr> + <th>Attribute Name</th> + <th>Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>flow.file.release.value</td> + <td>Files with this attribute are considered signals. The attribute + value is stored in a cache in the processor, and any held flow files + are released if their Release Signal Attribute (configured in the processor) + matches this value.</td> + </tr> + </tbody> + </table> + + <p> + <strong>Properties:</strong> + </p> + <p>In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.</p> + <ul> + <li><strong>Release Signal Attribute</strong> + <ul> + <li>The flow file attribute name on held files that will be checked against the signal cache.</li> + <li>Default value: none</li> + <li>Supports expression language: false</li> + </ul> + </li> + <li><strong>Max Signal Age</strong> + <ul> + <li>The maximum age of a signal that will trigger a file to be released. + Signals are expired from the cache after this duration, and + any matching held files will be routed to the 'expired' relationship. + Expected format is <duration> <time unit> where <duration> is a positive + integer and <time unit> is one of seconds, minutes, hours</li> + <li>Default value: 24 hours</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Copy Signal Attributes?</strong> + <ul> + <li>If true, a signal's flow file attributes will be copied to its matching held files, + with the exception of flow.file.release.value and the configured Signal Failure Attribute.</li> + <li>Default value: true</li> + <li>Supports expression language: false</li> + </ul> + </li> + <li>Signal Failure Attribute + <ul> + <li>Signals that have this attribute set to 'true' + will cause matching held flow files to route to Failure. If this attribute + is not populated, it is assumed that the flow file succeeds. This allows + upstream failures to propagate to held files.</li> + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul> + </li> + </ul> + + <p> + <strong>Relationships:</strong> + </p> + <ul> + <li>hold + <ul> + <li>Held files whose signals have not been received are routed here. The most common pattern is to + route this relationship back into the processor, and set it on a timer.</li> + </ul> + </li> + <li>release + <ul> + <li>Held files whose signals have been received are routed here.</li> + </ul> + </li> + <li>failure + <ul> + <li>Held files whose signal contains the Signal Failure Attribute are + routed here, indicating a processing failure upstream.</li> + </ul> + </li> + <li>expired + <ul> + <li>Held files that expire are routed here.</li> + </ul> + </li> + </ul> + + </body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java new file mode 100644 index 0000000..131466c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java @@ -0,0 +1,156 @@ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestHoldFile { + + private TestRunner testRunner; + + @Before + public void init() { + testRunner = TestRunners.newTestRunner(HoldFile.class); + } + + @Test + public void testHoldAndRelease() throws IOException { + testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); + testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); + testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); + testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "true"); + + // One signal file + Map<String, String> signal = new HashMap<>(); + signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); + signal.put("return.value", "a response"); + testRunner.enqueue("signal".getBytes(), signal); + + // Two normal flow files to be held + Map<String, String> metadata = new HashMap<>(); + metadata.put("identifier", "123456"); + + Map<String, String> metadata2 = new HashMap<>(); + metadata2.put("identifier", "1234567"); + + testRunner.enqueue("file1".getBytes(), metadata); + testRunner.enqueue("file2".getBytes(), metadata2); + + testRunner.run(); + + // One was held because it didn't match + testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); + // The matching one was released + testRunner.assertTransferCount(HoldFile.REL_RELEASE, 1); + + // Make sure the propagated attribute from signal was added to the held flow file + MockFlowFile released = testRunner + .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0); + assertEquals("Signal attributes were not copied to held file", + "a response", released.getAttribute("return.value")); + assertNull("flow.file.release.value should not be propagated", released + .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE)); + + // None expired + testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); + + // None failed + testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0); + } + + @Test + public void testHoldAndRelease_noCopy() throws IOException { + testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); + testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); + testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); + testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false"); + + // One signal file + Map<String, String> signal = new HashMap<>(); + signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); + signal.put("return.value", "a response"); + testRunner.enqueue("signal".getBytes(), signal); + + // Two normal flow files to be held + Map<String, String> metadata = new HashMap<>(); + metadata.put("identifier", "123456"); + + Map<String, String> metadata2 = new HashMap<>(); + metadata2.put("identifier", "1234567"); + + testRunner.enqueue("file1".getBytes(), metadata); + testRunner.enqueue("file2".getBytes(), metadata2); + + testRunner.run(); + + // One was held because it didn't match + testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); + // The matching one was released + testRunner.assertTransferCount(HoldFile.REL_RELEASE, 1); + + // Make sure the propagated attribute from signal was added to the held flow file + MockFlowFile released = testRunner + .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0); + assertNull("Attributes were not supposed to be copied", released + .getAttribute("return.value")); + assertNull("flow.file.release.value should not be propagated", released + .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE)); + + // None expired + testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); + + // None failed + testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0); + } + + @Test + public void testHoldAndRelease_failure() throws IOException { + testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours"); + testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, "identifier"); + testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed"); + testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false"); + + // One signal file + Map<String, String> signal = new HashMap<>(); + signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567"); + signal.put("return.value", "a response"); + signal.put("service.failed", "true"); + testRunner.enqueue("signal".getBytes(), signal); + + // Two normal flow files to be held + Map<String, String> metadata = new HashMap<>(); + metadata.put("identifier", "123456"); + + Map<String, String> metadata2 = new HashMap<>(); + metadata2.put("identifier", "1234567"); + + testRunner.enqueue("file1".getBytes(), metadata); + testRunner.enqueue("file2".getBytes(), metadata2); + + testRunner.run(); + + // One was held because it didn't match + testRunner.assertTransferCount(HoldFile.REL_HOLD, 1); + // The matching one was routed to failure because of the + // service.failed attribute + testRunner.assertTransferCount(HoldFile.REL_FAILURE, 1); + + // Make sure the propagated attribute from signal was added to the held flow file + MockFlowFile released = testRunner + .getFlowFilesForRelationship(HoldFile.REL_FAILURE).get(0); + assertNull("Attributes were not supposed to be copied", released + .getAttribute("return.value")); + + // None expired + testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0); + } +}
