NIFI-190 merged/resolved conflicts of 3 month old submission.  Still needs a 
thorough review


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6061475e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6061475e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6061475e

Branch: refs/heads/NIFI-190
Commit: 6061475edecb80e1f132614ffb5fbbe45faad9ed
Parents: 8e1e355
Author: joewitt <[email protected]>
Authored: Tue Mar 17 00:35:43 2015 -0400
Committer: joewitt <[email protected]>
Committed: Tue Mar 17 00:35:43 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/HoldFile.java      | 285 -------------------
 .../index.html                                  | 158 ----------
 .../nifi/processors/standard/TestHoldFile.java  | 155 ----------
 .../nifi/processors/standard/HoldFile.java      | 285 +++++++++++++++++++
 .../index.html                                  | 158 ++++++++++
 .../nifi/processors/standard/TestHoldFile.java  | 156 ++++++++++
 6 files changed, 599 insertions(+), 598 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
 
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
deleted file mode 100644
index b714a07..0000000
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
+++ /dev/null
@@ -1,285 +0,0 @@
-package org.apache.nifi.processors.standard;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.Tags;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-@Tags({"hold", "release", "signal"})
-@CapabilityDescription("Holds incoming flow files until a matching signal flow 
file enters the processor.  "
-               + "Incoming files are classified as either held files or 
signals.  "
-               + "Held files are routed to the Held relationship until a 
matching signal has been received.")
-public class HoldFile extends AbstractProcessor {
-       public static final String FLOW_FILE_RELEASE_VALUE = 
"flow.file.release.value";
-       public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
-       
-       public static final PropertyDescriptor MAX_SIGNAL_AGE = new 
PropertyDescriptor
-                       .Builder().name("Max Signal Age")
-                       .description("The maximum age of a signal that will 
trigger a file to be released.  "
-                                       + "Expected format is <duration> <time 
unit> where <duration> is a positive "
-                                       + "integer and <time unit> is one of 
seconds, minutes, hours")
-                       .required(true).defaultValue("24 hours")
-                       
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 
Integer.MAX_VALUE, TimeUnit.SECONDS))
-                       .build();
-       public static final PropertyDescriptor RELEASE_SIGNAL_ATTRIBUTE = new 
PropertyDescriptor
-                       .Builder().name("Release Signal Attribute")
-                       .description("The flow file attribute name on held 
files that will be checked against values in the signal cache.")
-                       .required(true).
-                       addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
-                       .build();
-       public static final PropertyDescriptor COPY_SIGNAL_ATTRIBUTES = new 
PropertyDescriptor
-                       .Builder().name("Copy Signal Attributes?")
-                       .description("If true, a signal's flow file attributes 
will be copied to its matching held files, "
-                                       + "with the exception of 
flow.file.release.value and the configured Signal Failure Attribute")
-                       .required(true)
-                       .defaultValue("true")
-                       .allowableValues("true", "false")
-                       .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-                       .build();
-       public static final PropertyDescriptor FAILURE_ATTRIBUTE = new 
PropertyDescriptor
-                       .Builder().name("Signal Failure Attribute")
-                       .description("Signals that have this attribute set to 
'true' "
-                                       + "will cause matching held flow files 
to route to Failure.  If this attribute "
-                                       + "is not populated, it is assumed that 
the flow file succeeds.")
-                       .required(false)
-                       
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
-                       .build();
-       
-       public static final Relationship REL_HOLD = new Relationship.Builder()
-                       .name("hold").description("Held files whose signals 
have not been received are routed here").build();
-
-       public static final Relationship REL_RELEASE = new 
Relationship.Builder()
-                       .name("release").description("Held files whose signals 
have been received are routed here").build();
-
-       public static final Relationship REL_EXPIRED = new 
Relationship.Builder()
-                       .name("expired").description("Held files that expire 
are routed here").build();
-
-       public static final Relationship REL_FAILURE = new 
Relationship.Builder()
-                       .name("failure").description("Held files whose signal 
contains the Signal Failure Attribute are "
-                                       + "routed here, indicating a processing 
failure upstream").build();
-       
-       private Set<String> excludedAttributes = new HashSet<>();
-
-       private Set<Relationship> relationships;
-       private List<PropertyDescriptor> descriptors;
-
-       private String failureAttribute;
-       private volatile Map<String, ReleaseAttributes> releaseValues = new 
ConcurrentHashMap<>();
-       private long expirationDuration = 0L;
-       private String releaseSignalAttribute;
-       private boolean copyAttributes;
-       
-
-       @Override
-       protected void init(final ProcessorInitializationContext context) {
-               final Set<Relationship> relationships = new HashSet<>();
-               relationships .add(REL_HOLD);
-               relationships .add(REL_RELEASE);
-               relationships .add(REL_EXPIRED);
-               relationships .add(REL_FAILURE);
-               this.relationships = Collections.unmodifiableSet(relationships);
-
-               final List<PropertyDescriptor> descriptors = new ArrayList<>();
-               descriptors.add(RELEASE_SIGNAL_ATTRIBUTE);
-               descriptors.add(MAX_SIGNAL_AGE);
-               descriptors.add(COPY_SIGNAL_ATTRIBUTES);
-               descriptors.add(FAILURE_ATTRIBUTE);
-               this.descriptors = Collections.unmodifiableList(descriptors);
-       }
-
-       @Override
-       public final Set<Relationship> getRelationships() { 
-               return relationships;
-       }
-
-       @Override
-       public final List<PropertyDescriptor> getSupportedPropertyDescriptors() 
{
-               return descriptors;
-       }
-
-       @OnScheduled
-       public void onScheduled(final ProcessContext context) {
-               this.expirationDuration = context.getProperty(MAX_SIGNAL_AGE)
-                               .asTimePeriod(TimeUnit.MILLISECONDS);
-               this.releaseSignalAttribute = context.getProperty(
-                               RELEASE_SIGNAL_ATTRIBUTE).getValue();
-               this.copyAttributes = context.getProperty(
-                               COPY_SIGNAL_ATTRIBUTES).asBoolean();
-               PropertyValue failureAttrValue = 
context.getProperty(FAILURE_ATTRIBUTE);
-               if (failureAttrValue != null) {
-                       this.failureAttribute = failureAttrValue.getValue();
-               }
-
-               this.excludedAttributes.clear();
-               this.excludedAttributes.add(FLOW_FILE_RELEASE_VALUE);
-               if (this.failureAttribute != null) {
-                       this.excludedAttributes.add(this.failureAttribute);
-               }
-       }
-
-       @Override
-       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
-               int encountered = 0;
-               FlowFile flowFile = null;
-               while((flowFile = session.get()) != null) {
-                       String releaseValueAttribute = 
flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE);
-                       
-                       if ((releaseValueAttribute == null)
-                                       || 
(releaseValueAttribute.trim().length() == 0)) {
-                               this.processHoldFile(flowFile, session);
-                       } else {
-                               this.processSignal(flowFile, session);
-                       }
-                       encountered++;
-               }       
-       
-               if (!isScheduled()) { 
-                       return; 
-               }
-               if (encountered == 0) { 
-                       context.yield();
-               }
-       }
-
-       /**
-        * Stores a signal and all associated flow file attributes, as 
applicable.
-        * @param flowFile
-        * @param session
-        */
-       private void processSignal(FlowFile flowFile, ProcessSession session) {
-               ReleaseAttributes releaseAttributes = new ReleaseAttributes();
-               // Store any propagated signal attributes
-               if (this.copyAttributes) {
-                       
releaseAttributes.attributes.putAll(flowFile.getAttributes());
-               }
-
-               // Check if the signal indicates a failure upstream
-               String failureValue = flowFile.getAttribute(failureAttribute);
-               releaseAttributes.failed = 
"true".equalsIgnoreCase(failureValue);
-               String releaseValue = 
flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE);
-               releaseValues.put(releaseValue , releaseAttributes);
-               if (getLogger().isDebugEnabled()) {
-                       getLogger().debug("{} is marking flow files with {}={} 
for release", 
-                                       new Object[] {flowFile, 
releaseSignalAttribute, releaseValue });
-               }
-               
-               session.remove(flowFile);
-       }
-
-       /**
-        * Transfers the held file to either Release (if a release signal has
-        * been found), Hold (if not), or Expired (if it has expired).
-        * @param flowFile
-        * @param session
-        */
-       private void processHoldFile(FlowFile flowFile, ProcessSession session) 
{
-               String attributeValue = 
flowFile.getAttribute(releaseSignalAttribute);
-
-               // Do we have a matching attribute to be released?
-               if (attributeValue != null && 
releaseValues.containsKey(attributeValue)) {
-                       ReleaseAttributes releaseAttributes = releaseValues
-                                       .get(attributeValue);
-                       boolean copyAttributes = 
(!releaseAttributes.attributes.isEmpty() && this.copyAttributes);
-                       if (copyAttributes) {
-                               Map<String, String> signalAttributes = 
getNewAttributes(
-                                               flowFile.getAttributes(), 
releaseAttributes.attributes,
-                                               this.excludedAttributes);
-                               flowFile = session.putAllAttributes(flowFile, 
signalAttributes);
-                               flowFile = session.removeAllAttributes(flowFile,
-                                               this.excludedAttributes);
-                       }
-                       if (getLogger().isDebugEnabled()) {
-                               getLogger().debug("{} was released.  Attributes 
copied? {}",
-                                               new Object[] { flowFile, 
copyAttributes });
-                       }
-                       // Remove the signal
-                       synchronized (releaseValues) {
-                               releaseValues.remove(attributeValue);
-                               if (releaseAttributes.failed) {
-                                       getLogger()
-                                                       .warn("Received a 
non-success value for {}, routing to failure: {}",
-                                                                       new 
Object[] { failureAttribute, flowFile });
-                                       session.transfer(flowFile, REL_FAILURE);
-                               } else {
-                                       session.transfer(flowFile, REL_RELEASE);
-                               }
-                       }
-               } else {
-                       // We don't have a signal yet. Let's check if it has 
expired
-                       long entryDate = flowFile.getEntryDate();
-                       if (isExpired(entryDate)) {
-                               session.transfer(flowFile, REL_EXPIRED);
-                               // It expired. We likely have some expired 
signals too, so let's
-                               // check
-                               synchronized (releaseValues) {
-                                       for (Iterator<Entry<String, 
ReleaseAttributes>> it = releaseValues
-                                                       .entrySet().iterator(); 
it.hasNext();) {
-                                               Entry<String, 
ReleaseAttributes> entry = it.next();
-                                               ReleaseAttributes 
releaseAttributes = entry.getValue();
-                                               if 
(isExpired(releaseAttributes.creationTimestamp)) {
-                                                       
releaseAttributes.attributes.clear();
-                                                       it.remove();
-                                               }
-                                       }
-                               }
-                               if (getLogger().isDebugEnabled()) {
-                                       getLogger().debug("Expiring {}", new 
Object[] { flowFile });
-                               }
-                       } else {
-                               // It hasn't expired, so transfer to Hold
-                               session.transfer(flowFile, REL_HOLD);
-                               if (getLogger().isTraceEnabled()) {
-                                       getLogger().trace("Holding {}", new 
Object[] { flowFile });
-                               }
-                       }
-               }
-       }
-
-       // Compares two maps and returns only those attributes that are new
-       private static Map<String, String> getNewAttributes(
-                       Map<String, String> existingAttributes,
-                       Map<String, String> attributesToAdd, Set<String> 
alwaysOverrideKeys) {
-               Map<String, String> newAttributes = new HashMap<>();
-               for (Entry<String, String> entryToAdd : 
attributesToAdd.entrySet()) {
-                       if (!existingAttributes.containsKey(entryToAdd.getKey())
-                                       || 
alwaysOverrideKeys.contains(entryToAdd.getKey())) {
-                               newAttributes.put(entryToAdd.getKey(), 
entryToAdd.getValue());
-                       }
-               }
-               return newAttributes;
-       }
-
-       // Returns true if the given time is far enough in the past to expire
-       private boolean isExpired(Long creationTimestamp) {
-               return creationTimestamp + expirationDuration < 
System.currentTimeMillis();
-       }
-
-       public static class ReleaseAttributes {
-               private long creationTimestamp = System.currentTimeMillis();
-               private boolean failed;
-               private Map<String, String> attributes = new HashMap<>();
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
 
b/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
deleted file mode 100644
index 221c8cb..0000000
--- 
a/nar-bundles/standard-bundle/standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
+++ /dev/null
@@ -1,158 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <head>
-        <meta charset="utf-8" />
-        <title>HoldFile</title>
-
-
-        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
-    </head>
-
-    <body>
-
-        <!-- Processor Documentation 
================================================== -->
-        <h2>Description:</h2>
-        <p>This processor holds incoming flow files until a matching signal 
-        flow file enters the processor.  Incoming files are classified as 
either 
-        held files or signals.  Held files are routed to the Held relationship 
-        until a matching signal has been received.
-        </p>
-
-               <p>This processor primarily supports the following use case:
-               <ul>
-                       <li>A file in format A needs to be sent to Endpoint 
A</li>
-                       <li>The same file in format B needs to be sent to 
Endpoint B, 
-                               but should not proceed until A has reached 
Endpoint A.  
-                               This restriction is most common when Endpoint B 
-                               requires some output of Endpoint A.</li>
-               </ul>
-               </p>
-               <p>Signal files are distinguished from held files by the 
presence of the 
-               "flow.file.release.value" attribute on the signal files.  When 
a signal
-               file enters, its value is cached in the processor.  The 
processor is also
-               configured with a "Release Signal Attribute".  Held files with 
this 
-               attribute whose value matches a received signal value will be 
released.</p>
-
-               <p>An example:  HoldFile is configured with Release Signal 
Attribute = "myId".  
-               Its 'hold' relationship routes back onto itself.
-               <ol>
-                       <li>flowFile 1 { myId : "123" } enters HoldFile.  It is 
routed to the 'hold' relationship.</li>
-                       <li>flowFile 2 { flow.file.release.value : "123" } 
enters HoldFile.  
-                       Flowfile 1 is then routed to 'release'.</li>
-               </ol>
-
-               <p>Signal flow files will also copy their attributes to 
matching held files, 
-               unless otherwise indicated.  This is what allows the output of 
-               Endpoint A to pass to Endpoint B, above.
-               </p>
-        <p>
-            <strong>Uses Attributes:</strong>
-        </p>
-        <table border="1">
-            <thead>
-                <tr>
-                    <th>Attribute Name</th>
-                    <th>Description</th>
-                </tr>
-            </thead>
-            <tbody>
-                <tr>
-                    <td>flow.file.release.value</td>
-                    <td>Files with this attribute are considered signals.  The 
attribute
-                    value is stored in a cache in the processor, and any held 
flow files
-                    are released if their Release Signal Attribute (configured 
in the processor)
-                    matches this value.</td>
-                </tr>
-            </tbody>
-        </table>
-
-        <p>
-            <strong>Properties:</strong>
-        </p>
-        <p>In the list below, the names of required properties appear
-            in bold. Any other properties (not in bold) are considered 
optional.
-            If a property has a default value, it is indicated. If a property
-            supports the use of the NiFi Expression Language (or simply,
-            "expression language"), that is also indicated.</p>
-        <ul>
-            <li><strong>Release Signal Attribute</strong>
-                <ul>
-                    <li>The flow file attribute name on held files that will 
be checked against the signal cache.</li>
-                    <li>Default value: none</li>
-                    <li>Supports expression language: false</li>
-                </ul>
-            </li>
-            <li><strong>Max Signal Age</strong>
-                <ul>
-                    <li>The maximum age of a signal that will trigger a file 
to be released.
-                    Signals are expired from the cache after this duration, and
-                    any matching held files will be routed to the 'expired' 
relationship.
-                                       Expected format is <duration> <time 
unit> where <duration> is a positive 
-                                       integer and <time unit> is one of 
seconds, minutes, hours</li>
-                    <li>Default value: 24 hours</li>
-                    <li>Supports expression language: false</li>
-                </ul></li>
-            <li><strong>Copy Signal Attributes?</strong>
-                <ul>
-                    <li>If true, a signal's flow file attributes will be 
copied to its matching held files, 
-                                       with the exception of 
flow.file.release.value and the configured Signal Failure Attribute.</li>
-                    <li>Default value: true</li>
-                    <li>Supports expression language: false</li>
-                </ul>
-            </li>
-            <li>Signal Failure Attribute
-                <ul>
-                    <li>Signals that have this attribute set to 'true' 
-                                       will cause matching held flow files to 
route to Failure.  If this attribute 
-                                       is not populated, it is assumed that 
the flow file succeeds.  This allows
-                                       upstream failures to propagate to held 
files.</li>
-                    <li>Default value: no default</li>
-                    <li>Supports expression language: false</li>
-                </ul>
-            </li>
-        </ul>
-
-        <p>
-            <strong>Relationships:</strong>
-        </p>
-        <ul>
-            <li>hold
-                <ul>
-                    <li>Held files whose signals have not been received are 
routed here.  The most common pattern is to
-                    route this relationship back into the processor, and set 
it on a timer.</li>
-                </ul>
-            </li>
-            <li>release
-                <ul>
-                    <li>Held files whose signals have been received are routed 
here.</li>
-                </ul>
-            </li>
-            <li>failure
-                <ul>
-                    <li>Held files whose signal contains the Signal Failure 
Attribute are 
-                    routed here, indicating a processing failure upstream.</li>
-                </ul>
-            </li>
-            <li>expired
-                <ul>
-                    <li>Held files that expire are routed here.</li>
-                </ul>
-            </li>
-        </ul>
-
-    </body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
 
b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
deleted file mode 100644
index f7b2678..0000000
--- 
a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package org.apache.nifi.processors.standard;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestHoldFile {
-       private TestRunner testRunner;
-
-       @Before
-       public void init() {
-               testRunner = TestRunners.newTestRunner(HoldFile.class);
-       }
-
-       @Test
-       public void testHoldAndRelease() throws IOException {
-               testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
-               testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
-               testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, 
"service.failed");
-               testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "true");
-               
-               // One signal file
-               Map<String, String> signal = new HashMap<String, String>();
-               signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
-               signal.put("return.value", "a response");
-               testRunner.enqueue("signal".getBytes(), signal);
-
-               // Two normal flow files to be held
-               Map<String, String> metadata = new HashMap<String, String>();
-               metadata.put("identifier", "123456");
-
-               Map<String, String> metadata2 = new HashMap<String, String>();
-               metadata2.put("identifier", "1234567");
-               
-               testRunner.enqueue("file1".getBytes(), metadata);
-               testRunner.enqueue("file2".getBytes(), metadata2);
-
-               testRunner.run();
-               
-               // One was held because it didn't match
-               testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
-               // The matching one was released
-               testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1);
-
-               // Make sure the propagated attribute from signal was added to 
the held flow file
-               MockFlowFile released = testRunner
-                               
.getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0);
-               assertEquals("Signal attributes were not copied to held file", 
-                               "a response", 
released.getAttribute("return.value"));
-               assertNull("flow.file.release.value should not be propagated", 
released
-                               
.getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE));
-
-               // None expired
-               testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
-
-               // None failed
-               testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0);
-       }
-
-       @Test
-       public void testHoldAndRelease_noCopy() throws IOException {
-               testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
-               testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
-               testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, 
"service.failed");
-               testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, 
"false");
-               
-               // One signal file
-               Map<String, String> signal = new HashMap<String, String>();
-               signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
-               signal.put("return.value", "a response");
-               testRunner.enqueue("signal".getBytes(), signal);
-
-               // Two normal flow files to be held
-               Map<String, String> metadata = new HashMap<String, String>();
-               metadata.put("identifier", "123456");
-
-               Map<String, String> metadata2 = new HashMap<String, String>();
-               metadata2.put("identifier", "1234567");
-               
-               testRunner.enqueue("file1".getBytes(), metadata);
-               testRunner.enqueue("file2".getBytes(), metadata2);
-
-               testRunner.run();
-               
-               // One was held because it didn't match
-               testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
-               // The matching one was released
-               testRunner.assertTransferCount(HoldFile. REL_RELEASE, 1);
-
-               // Make sure the propagated attribute from signal was added to 
the held flow file
-               MockFlowFile released = testRunner
-                               
.getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0);
-               assertNull("Attributes were not supposed to be copied", released
-                               .getAttribute("return.value"));
-               assertNull("flow.file.release.value should not be propagated", 
released
-                               
.getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE));
-
-               // None expired
-               testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
-
-               // None failed
-               testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0);
-       }
-       
-       @Test
-       public void testHoldAndRelease_failure() throws IOException {
-               testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
-               testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
-               testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, 
"service.failed");
-               testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, 
"false");
-               
-               // One signal file
-               Map<String, String> signal = new HashMap<String, String>();
-               signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
-               signal.put("return.value", "a response");
-               signal.put("service.failed", "true");
-               testRunner.enqueue("signal".getBytes(), signal);
-
-               // Two normal flow files to be held
-               Map<String, String> metadata = new HashMap<String, String>();
-               metadata.put("identifier", "123456");
-
-               Map<String, String> metadata2 = new HashMap<String, String>();
-               metadata2.put("identifier", "1234567");
-               
-               testRunner.enqueue("file1".getBytes(), metadata);
-               testRunner.enqueue("file2".getBytes(), metadata2);
-
-               testRunner.run();
-               
-               // One was held because it didn't match
-               testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
-               // The matching one was routed to failure because of the
-               // service.failed attribute
-               testRunner.assertTransferCount(HoldFile. REL_FAILURE, 1);
-
-               // Make sure the propagated attribute from signal was added to 
the held flow file
-               MockFlowFile released = testRunner
-                               
.getFlowFilesForRelationship(HoldFile.REL_FAILURE).get(0);
-               assertNull("Attributes were not supposed to be copied", released
-                               .getAttribute("return.value"));
-
-               // None expired
-               testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
new file mode 100644
index 0000000..5dbaa06
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HoldFile.java
@@ -0,0 +1,285 @@
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"hold", "release", "signal"})
+@CapabilityDescription("Holds incoming flow files until a matching signal flow 
file enters the processor.  "
+               + "Incoming files are classified as either held files or 
signals.  "
+               + "Held files are routed to the Held relationship until a 
matching signal has been received.")
+public class HoldFile extends AbstractProcessor {
+       public static final String FLOW_FILE_RELEASE_VALUE = 
"flow.file.release.value";
+       public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+       
+       public static final PropertyDescriptor MAX_SIGNAL_AGE = new 
PropertyDescriptor
+                       .Builder().name("Max Signal Age")
+                       .description("The maximum age of a signal that will 
trigger a file to be released.  "
+                                       + "Expected format is <duration> <time 
unit> where <duration> is a positive "
+                                       + "integer and <time unit> is one of 
seconds, minutes, hours")
+                       .required(true).defaultValue("24 hours")
+                       
.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 
Integer.MAX_VALUE, TimeUnit.SECONDS))
+                       .build();
+       public static final PropertyDescriptor RELEASE_SIGNAL_ATTRIBUTE = new 
PropertyDescriptor
+                       .Builder().name("Release Signal Attribute")
+                       .description("The flow file attribute name on held 
files that will be checked against values in the signal cache.")
+                       .required(true).
+                       addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+                       .build();
+       public static final PropertyDescriptor COPY_SIGNAL_ATTRIBUTES = new 
PropertyDescriptor
+                       .Builder().name("Copy Signal Attributes?")
+                       .description("If true, a signal's flow file attributes 
will be copied to its matching held files, "
+                                       + "with the exception of 
flow.file.release.value and the configured Signal Failure Attribute")
+                       .required(true)
+                       .defaultValue("true")
+                       .allowableValues("true", "false")
+                       .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                       .build();
+       public static final PropertyDescriptor FAILURE_ATTRIBUTE = new 
PropertyDescriptor
+                       .Builder().name("Signal Failure Attribute")
+                       .description("Signals that have this attribute set to 
'true' "
+                                       + "will cause matching held flow files 
to route to Failure.  If this attribute "
+                                       + "is not populated, it is assumed that 
the flow file succeeds.")
+                       .required(false)
+                       
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+                       .build();
+       
+       public static final Relationship REL_HOLD = new Relationship.Builder()
+                       .name("hold").description("Held files whose signals 
have not been received are routed here").build();
+
+       public static final Relationship REL_RELEASE = new 
Relationship.Builder()
+                       .name("release").description("Held files whose signals 
have been received are routed here").build();
+
+       public static final Relationship REL_EXPIRED = new 
Relationship.Builder()
+                       .name("expired").description("Held files that expire 
are routed here").build();
+
+       public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+                       .name("failure").description("Held files whose signal 
contains the Signal Failure Attribute are "
+                                       + "routed here, indicating a processing 
failure upstream").build();
+       
+       private final Set<String> excludedAttributes = new HashSet<>();
+
+       private Set<Relationship> relationships;
+       private List<PropertyDescriptor> descriptors;
+
+       private String failureAttribute;
+       private volatile Map<String, ReleaseAttributes> releaseValues = new 
ConcurrentHashMap<>();
+       private long expirationDuration = 0L;
+       private String releaseSignalAttribute;
+       private boolean copyAttributes;
+       
+
+       @Override
+       protected void init(final ProcessorInitializationContext context) {
+               final Set<Relationship> relationships = new HashSet<>();
+               relationships .add(REL_HOLD);
+               relationships .add(REL_RELEASE);
+               relationships .add(REL_EXPIRED);
+               relationships .add(REL_FAILURE);
+               this.relationships = Collections.unmodifiableSet(relationships);
+
+               final List<PropertyDescriptor> descriptors = new ArrayList<>();
+               descriptors.add(RELEASE_SIGNAL_ATTRIBUTE);
+               descriptors.add(MAX_SIGNAL_AGE);
+               descriptors.add(COPY_SIGNAL_ATTRIBUTES);
+               descriptors.add(FAILURE_ATTRIBUTE);
+               this.descriptors = Collections.unmodifiableList(descriptors);
+       }
+
+       @Override
+       public final Set<Relationship> getRelationships() { 
+               return relationships;
+       }
+
+       @Override
+       public final List<PropertyDescriptor> getSupportedPropertyDescriptors() 
{
+               return descriptors;
+       }
+
+       @OnScheduled
+       public void onScheduled(final ProcessContext context) {
+               this.expirationDuration = context.getProperty(MAX_SIGNAL_AGE)
+                               .asTimePeriod(TimeUnit.MILLISECONDS);
+               this.releaseSignalAttribute = context.getProperty(
+                               RELEASE_SIGNAL_ATTRIBUTE).getValue();
+               this.copyAttributes = context.getProperty(
+                               COPY_SIGNAL_ATTRIBUTES).asBoolean();
+               PropertyValue failureAttrValue = 
context.getProperty(FAILURE_ATTRIBUTE);
+               if (failureAttrValue != null) {
+                       this.failureAttribute = failureAttrValue.getValue();
+               }
+
+               this.excludedAttributes.clear();
+               this.excludedAttributes.add(FLOW_FILE_RELEASE_VALUE);
+               if (this.failureAttribute != null) {
+                       this.excludedAttributes.add(this.failureAttribute);
+               }
+       }
+
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               int encountered = 0;
+               FlowFile flowFile = null;
+               while((flowFile = session.get()) != null) {
+                       String releaseValueAttribute = 
flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE);
+                       
+                       if ((releaseValueAttribute == null)
+                                       || 
(releaseValueAttribute.trim().length() == 0)) {
+                               this.processHoldFile(flowFile, session);
+                       } else {
+                               this.processSignal(flowFile, session);
+                       }
+                       encountered++;
+               }       
+       
+               if (!isScheduled()) { 
+                       return; 
+               }
+               if (encountered == 0) { 
+                       context.yield();
+               }
+       }
+
+       /**
+        * Stores a signal and all associated flow file attributes, as 
applicable.
+        * @param flowFile
+        * @param session
+        */
+       private void processSignal(FlowFile flowFile, ProcessSession session) {
+               ReleaseAttributes releaseAttributes = new ReleaseAttributes();
+               // Store any propagated signal attributes
+               if (this.copyAttributes) {
+                       
releaseAttributes.attributes.putAll(flowFile.getAttributes());
+               }
+
+               // Check if the signal indicates a failure upstream
+               String failureValue = flowFile.getAttribute(failureAttribute);
+               releaseAttributes.failed = 
"true".equalsIgnoreCase(failureValue);
+               String releaseValue = 
flowFile.getAttribute(FLOW_FILE_RELEASE_VALUE);
+               releaseValues.put(releaseValue , releaseAttributes);
+               if (getLogger().isDebugEnabled()) {
+                       getLogger().debug("{} is marking flow files with {}={} 
for release", 
+                                       new Object[] {flowFile, 
releaseSignalAttribute, releaseValue });
+               }
+               
+               session.remove(flowFile);
+       }
+
+       /**
+        * Transfers the held file to either Release (if a release signal has
+        * been found), Hold (if not), or Expired (if it has expired).
+        * @param flowFile
+        * @param session
+        */
+       private void processHoldFile(FlowFile flowFile, ProcessSession session) 
{
+               String attributeValue = 
flowFile.getAttribute(releaseSignalAttribute);
+
+               // Do we have a matching attribute to be released?
+               if (attributeValue != null && 
releaseValues.containsKey(attributeValue)) {
+                       ReleaseAttributes releaseAttributes = releaseValues
+                                       .get(attributeValue);
+                       boolean copyAttributes = 
(!releaseAttributes.attributes.isEmpty() && this.copyAttributes);
+                       if (copyAttributes) {
+                               Map<String, String> signalAttributes = 
getNewAttributes(
+                                               flowFile.getAttributes(), 
releaseAttributes.attributes,
+                                               this.excludedAttributes);
+                               flowFile = session.putAllAttributes(flowFile, 
signalAttributes);
+                               flowFile = session.removeAllAttributes(flowFile,
+                                               this.excludedAttributes);
+                       }
+                       if (getLogger().isDebugEnabled()) {
+                               getLogger().debug("{} was released.  Attributes 
copied? {}",
+                                               new Object[] { flowFile, 
copyAttributes });
+                       }
+                       // Remove the signal
+                       synchronized (releaseValues) {
+                               releaseValues.remove(attributeValue);
+                               if (releaseAttributes.failed) {
+                                       getLogger()
+                                                       .warn("Received a 
non-success value for {}, routing to failure: {}",
+                                                                       new 
Object[] { failureAttribute, flowFile });
+                                       session.transfer(flowFile, REL_FAILURE);
+                               } else {
+                                       session.transfer(flowFile, REL_RELEASE);
+                               }
+                       }
+               } else {
+                       // We don't have a signal yet. Let's check if it has 
expired
+                       long entryDate = flowFile.getEntryDate();
+                       if (isExpired(entryDate)) {
+                               session.transfer(flowFile, REL_EXPIRED);
+                               // It expired. We likely have some expired 
signals too, so let's
+                               // check
+                               synchronized (releaseValues) {
+                                       for (Iterator<Entry<String, 
ReleaseAttributes>> it = releaseValues
+                                                       .entrySet().iterator(); 
it.hasNext();) {
+                                               Entry<String, 
ReleaseAttributes> entry = it.next();
+                                               ReleaseAttributes 
releaseAttributes = entry.getValue();
+                                               if 
(isExpired(releaseAttributes.creationTimestamp)) {
+                                                       
releaseAttributes.attributes.clear();
+                                                       it.remove();
+                                               }
+                                       }
+                               }
+                               if (getLogger().isDebugEnabled()) {
+                                       getLogger().debug("Expiring {}", new 
Object[] { flowFile });
+                               }
+                       } else {
+                               // It hasn't expired, so transfer to Hold
+                               session.transfer(flowFile, REL_HOLD);
+                               if (getLogger().isTraceEnabled()) {
+                                       getLogger().trace("Holding {}", new 
Object[] { flowFile });
+                               }
+                       }
+               }
+       }
+
+       // Compares two maps and returns only those attributes that are new
+       private static Map<String, String> getNewAttributes(
+                       Map<String, String> existingAttributes,
+                       Map<String, String> attributesToAdd, Set<String> 
alwaysOverrideKeys) {
+               Map<String, String> newAttributes = new HashMap<>();
+               for (Entry<String, String> entryToAdd : 
attributesToAdd.entrySet()) {
+                       if (!existingAttributes.containsKey(entryToAdd.getKey())
+                                       || 
alwaysOverrideKeys.contains(entryToAdd.getKey())) {
+                               newAttributes.put(entryToAdd.getKey(), 
entryToAdd.getValue());
+                       }
+               }
+               return newAttributes;
+       }
+
+       // Returns true if the given time is far enough in the past to expire
+       private boolean isExpired(Long creationTimestamp) {
+               return creationTimestamp + expirationDuration < 
System.currentTimeMillis();
+       }
+
+       public static class ReleaseAttributes {
+               private long creationTimestamp = System.currentTimeMillis();
+               private boolean failed;
+               private Map<String, String> attributes = new HashMap<>();
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
new file mode 100644
index 0000000..221c8cb
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.HoldFile/index.html
@@ -0,0 +1,158 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>HoldFile</title>
+
+
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>This processor holds incoming flow files until a matching signal 
+        flow file enters the processor.  Incoming files are classified as 
either 
+        held files or signals.  Held files are routed to the Held relationship 
+        until a matching signal has been received.
+        </p>
+
+               <p>This processor primarily supports the following use case:
+               <ul>
+                       <li>A file in format A needs to be sent to Endpoint 
A</li>
+                       <li>The same file in format B needs to be sent to 
Endpoint B, 
+                               but should not proceed until A has reached 
Endpoint A.  
+                               This restriction is most common when Endpoint B 
+                               requires some output of Endpoint A.</li>
+               </ul>
+               </p>
+               <p>Signal files are distinguished from held files by the 
presence of the 
+               "flow.file.release.value" attribute on the signal files.  When 
a signal
+               file enters, its value is cached in the processor.  The 
processor is also
+               configured with a "Release Signal Attribute".  Held files with 
this 
+               attribute whose value matches a received signal value will be 
released.</p>
+
+               <p>An example:  HoldFile is configured with Release Signal 
Attribute = "myId".  
+               Its 'hold' relationship routes back onto itself.
+               <ol>
+                       <li>flowFile 1 { myId : "123" } enters HoldFile.  It is 
routed to the 'hold' relationship.</li>
+                       <li>flowFile 2 { flow.file.release.value : "123" } 
enters HoldFile.  
+                       Flowfile 1 is then routed to 'release'.</li>
+               </ol>
+
+               <p>Signal flow files will also copy their attributes to 
matching held files, 
+               unless otherwise indicated.  This is what allows the output of 
+               Endpoint A to pass to Endpoint B, above.
+               </p>
+        <p>
+            <strong>Uses Attributes:</strong>
+        </p>
+        <table border="1">
+            <thead>
+                <tr>
+                    <th>Attribute Name</th>
+                    <th>Description</th>
+                </tr>
+            </thead>
+            <tbody>
+                <tr>
+                    <td>flow.file.release.value</td>
+                    <td>Files with this attribute are considered signals.  The 
attribute
+                    value is stored in a cache in the processor, and any held 
flow files
+                    are released if their Release Signal Attribute (configured 
in the processor)
+                    matches this value.</td>
+                </tr>
+            </tbody>
+        </table>
+
+        <p>
+            <strong>Properties:</strong>
+        </p>
+        <p>In the list below, the names of required properties appear
+            in bold. Any other properties (not in bold) are considered 
optional.
+            If a property has a default value, it is indicated. If a property
+            supports the use of the NiFi Expression Language (or simply,
+            "expression language"), that is also indicated.</p>
+        <ul>
+            <li><strong>Release Signal Attribute</strong>
+                <ul>
+                    <li>The flow file attribute name on held files that will 
be checked against the signal cache.</li>
+                    <li>Default value: none</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Max Signal Age</strong>
+                <ul>
+                    <li>The maximum age of a signal that will trigger a file 
to be released.
+                    Signals are expired from the cache after this duration, and
+                    any matching held files will be routed to the 'expired' 
relationship.
+                                       Expected format is <duration> <time 
unit> where <duration> is a positive 
+                                       integer and <time unit> is one of 
seconds, minutes, hours</li>
+                    <li>Default value: 24 hours</li>
+                    <li>Supports expression language: false</li>
+                </ul></li>
+            <li><strong>Copy Signal Attributes?</strong>
+                <ul>
+                    <li>If true, a signal's flow file attributes will be 
copied to its matching held files, 
+                                       with the exception of 
flow.file.release.value and the configured Signal Failure Attribute.</li>
+                    <li>Default value: true</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li>Signal Failure Attribute
+                <ul>
+                    <li>Signals that have this attribute set to 'true' 
+                                       will cause matching held flow files to 
route to Failure.  If this attribute 
+                                       is not populated, it is assumed that 
the flow file succeeds.  This allows
+                                       upstream failures to propagate to held 
files.</li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+        </ul>
+
+        <p>
+            <strong>Relationships:</strong>
+        </p>
+        <ul>
+            <li>hold
+                <ul>
+                    <li>Held files whose signals have not been received are 
routed here.  The most common pattern is to
+                    route this relationship back into the processor, and set 
it on a timer.</li>
+                </ul>
+            </li>
+            <li>release
+                <ul>
+                    <li>Held files whose signals have been received are routed 
here.</li>
+                </ul>
+            </li>
+            <li>failure
+                <ul>
+                    <li>Held files whose signal contains the Signal Failure 
Attribute are 
+                    routed here, indicating a processing failure upstream.</li>
+                </ul>
+            </li>
+            <li>expired
+                <ul>
+                    <li>Held files that expire are routed here.</li>
+                </ul>
+            </li>
+        </ul>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6061475e/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
new file mode 100644
index 0000000..131466c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHoldFile.java
@@ -0,0 +1,156 @@
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHoldFile {
+
+    private TestRunner testRunner;
+
+    @Before
+    public void init() {
+        testRunner = TestRunners.newTestRunner(HoldFile.class);
+    }
+
+    @Test
+    public void testHoldAndRelease() throws IOException {
+        testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
+        testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
+        testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed");
+        testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "true");
+
+        // One signal file
+        Map<String, String> signal = new HashMap<>();
+        signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
+        signal.put("return.value", "a response");
+        testRunner.enqueue("signal".getBytes(), signal);
+
+        // Two normal flow files to be held
+        Map<String, String> metadata = new HashMap<>();
+        metadata.put("identifier", "123456");
+
+        Map<String, String> metadata2 = new HashMap<>();
+        metadata2.put("identifier", "1234567");
+
+        testRunner.enqueue("file1".getBytes(), metadata);
+        testRunner.enqueue("file2".getBytes(), metadata2);
+
+        testRunner.run();
+
+        // One was held because it didn't match
+        testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
+        // The matching one was released
+        testRunner.assertTransferCount(HoldFile.REL_RELEASE, 1);
+
+        // Make sure the propagated attribute from signal was added to the 
held flow file
+        MockFlowFile released = testRunner
+                .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0);
+        assertEquals("Signal attributes were not copied to held file",
+                "a response", released.getAttribute("return.value"));
+        assertNull("flow.file.release.value should not be propagated", released
+                .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE));
+
+        // None expired
+        testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
+
+        // None failed
+        testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testHoldAndRelease_noCopy() throws IOException {
+        testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
+        testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
+        testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed");
+        testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false");
+
+        // One signal file
+        Map<String, String> signal = new HashMap<>();
+        signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
+        signal.put("return.value", "a response");
+        testRunner.enqueue("signal".getBytes(), signal);
+
+        // Two normal flow files to be held
+        Map<String, String> metadata = new HashMap<>();
+        metadata.put("identifier", "123456");
+
+        Map<String, String> metadata2 = new HashMap<>();
+        metadata2.put("identifier", "1234567");
+
+        testRunner.enqueue("file1".getBytes(), metadata);
+        testRunner.enqueue("file2".getBytes(), metadata2);
+
+        testRunner.run();
+
+        // One was held because it didn't match
+        testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
+        // The matching one was released
+        testRunner.assertTransferCount(HoldFile.REL_RELEASE, 1);
+
+        // Make sure the propagated attribute from signal was added to the 
held flow file
+        MockFlowFile released = testRunner
+                .getFlowFilesForRelationship(HoldFile.REL_RELEASE).get(0);
+        assertNull("Attributes were not supposed to be copied", released
+                .getAttribute("return.value"));
+        assertNull("flow.file.release.value should not be propagated", released
+                .getAttribute(HoldFile.FLOW_FILE_RELEASE_VALUE));
+
+        // None expired
+        testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
+
+        // None failed
+        testRunner.assertTransferCount(HoldFile.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testHoldAndRelease_failure() throws IOException {
+        testRunner.setProperty(HoldFile.MAX_SIGNAL_AGE, "24 hours");
+        testRunner.setProperty(HoldFile.RELEASE_SIGNAL_ATTRIBUTE, 
"identifier");
+        testRunner.setProperty(HoldFile.FAILURE_ATTRIBUTE, "service.failed");
+        testRunner.setProperty(HoldFile.COPY_SIGNAL_ATTRIBUTES, "false");
+
+        // One signal file
+        Map<String, String> signal = new HashMap<>();
+        signal.put(HoldFile.FLOW_FILE_RELEASE_VALUE, "1234567");
+        signal.put("return.value", "a response");
+        signal.put("service.failed", "true");
+        testRunner.enqueue("signal".getBytes(), signal);
+
+        // Two normal flow files to be held
+        Map<String, String> metadata = new HashMap<>();
+        metadata.put("identifier", "123456");
+
+        Map<String, String> metadata2 = new HashMap<>();
+        metadata2.put("identifier", "1234567");
+
+        testRunner.enqueue("file1".getBytes(), metadata);
+        testRunner.enqueue("file2".getBytes(), metadata2);
+
+        testRunner.run();
+
+        // One was held because it didn't match
+        testRunner.assertTransferCount(HoldFile.REL_HOLD, 1);
+               // The matching one was routed to failure because of the
+        // service.failed attribute
+        testRunner.assertTransferCount(HoldFile.REL_FAILURE, 1);
+
+        // Make sure the propagated attribute from signal was added to the 
held flow file
+        MockFlowFile released = testRunner
+                .getFlowFilesForRelationship(HoldFile.REL_FAILURE).get(0);
+        assertNull("Attributes were not supposed to be copied", released
+                .getAttribute("return.value"));
+
+        // None expired
+        testRunner.assertTransferCount(HoldFile.REL_EXPIRED, 0);
+    }
+}

Reply via email to