http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
index 0000000,be4aed6..1fe78af
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
@@@ -1,0 -1,261 +1,261 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.components.AllowableValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.expression.AttributeExpression.ResultType;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ /**
+  * <p>
+  * This processor routes a FlowFile based on its flow file attributes by using
+  * the Attribute Expression Language. The Expression Language is used by 
adding
+  * Optional Properties to the processor. The name of the Property indicates 
the
+  * name of the relationship to which a FlowFile will be routed if matched. The
+  * value of the Property indicates an Attribute Expression Language Expression
+  * that will be used to determine whether or not a given FlowFile will be 
routed
+  * to the associated relationship. If multiple expressions match a FlowFile's
+  * attributes, that FlowFile will be cloned and routed to each corresponding
+  * relationship. If none of the supplied expressions matches for a given
+  * FlowFile, that FlowFile will be routed to the 'unmatched' relationship.
+  * </p>
+  *
+  * @author unattributed
+  */
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"attributes", "routing", "Attribute Expression Language", "regexp", 
"regex", "Regular Expression", "Expression Language"})
+ @CapabilityDescription("Routes FlowFiles based on their Attributes using the 
Attribute Expression Language")
+ public class RouteOnAttribute extends AbstractProcessor {
+ 
+     public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route";
+ 
+     // keep the word 'match' instead of 'matched' to maintain backward 
compatibility (there was a typo originally)
+     private static final String routeAllMatchValue = "Route to 'match' if all 
match";
+     private static final String routeAnyMatches = "Route to 'match' if any 
matches";
+     private static final String routePropertyNameValue = "Route to Property 
name";
+ 
+     public static final AllowableValue ROUTE_PROPERTY_NAME = new 
AllowableValue(
+             routePropertyNameValue,
+             "Route to Property name",
+             "A copy of the FlowFile will be routed to each relationship whose 
corresponding expression evaluates to 'true'"
+     );
+     public static final AllowableValue ROUTE_ALL_MATCH = new AllowableValue(
+             routeAllMatchValue,
+             "Route to 'matched' if all match",
+             "Requires that all user-defined expressions evaluate to 'true' 
for the FlowFile to be considered a match"
+     );
+     public static final AllowableValue ROUTE_ANY_MATCHES = new AllowableValue(
+             routeAnyMatches, // keep the word 'match' instead of 'matched' to 
maintain backward compatibility (there was a typo originally)
+             "Route to 'matched' if any matches",
+             "Requires that at least one user-defined expression evaluate to 
'true' for hte FlowFile to be considered a match"
+     );
+ 
+     public static final PropertyDescriptor ROUTE_STRATEGY = new 
PropertyDescriptor.Builder()
+             .name("Routing Strategy")
+             .description("Specifies how to determine which relationship to 
use when evaluating the Expression Language")
+             .required(true)
+             .allowableValues(ROUTE_PROPERTY_NAME, ROUTE_ALL_MATCH, 
ROUTE_ANY_MATCHES)
+             .defaultValue(ROUTE_PROPERTY_NAME.getValue())
+             .build();
+ 
+     public static final Relationship REL_NO_MATCH = new Relationship.Builder()
+             .name("unmatched")
+             .description("FlowFiles that do not match any user-define 
expression will be routed here")
+             .build();
+     public static final Relationship REL_MATCH = new Relationship.Builder()
+             .name("matched")
+             .description("FlowFiles will be routed to 'match' if one or all 
Expressions match, depending on the configuration of the Routing Strategy 
property")
+             .build();
+ 
+     private AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();
+     private List<PropertyDescriptor> properties;
+     private volatile String configuredRouteStrategy = 
ROUTE_STRATEGY.getDefaultValue();
+     private volatile Set<String> dynamicPropertyNames = new HashSet<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> set = new HashSet<>();
+         set.add(REL_NO_MATCH);
+         relationships = new AtomicReference<>(set);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(ROUTE_STRATEGY);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships.get();
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+         return new PropertyDescriptor.Builder()
+                 .required(false)
+                 .name(propertyDescriptorName)
+                 
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.BOOLEAN,
 false))
+                 .dynamic(true)
+                 .expressionLanguageSupported(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+         if (descriptor.equals(ROUTE_STRATEGY)) {
+             configuredRouteStrategy = newValue;
+         } else {
+             final Set<String> newDynamicPropertyNames = new 
HashSet<>(dynamicPropertyNames);
+             if (newValue == null) {
+                 newDynamicPropertyNames.remove(descriptor.getName());
+             } else if (oldValue == null) {    // new property
+                 newDynamicPropertyNames.add(descriptor.getName());
+             }
+ 
+             this.dynamicPropertyNames = 
Collections.unmodifiableSet(newDynamicPropertyNames);
+         }
+ 
+         // formulate the new set of Relationships
+         final Set<String> allDynamicProps = this.dynamicPropertyNames;
+         final Set<Relationship> newRelationships = new HashSet<>();
+         final String routeStrategy = configuredRouteStrategy;
+         if (ROUTE_PROPERTY_NAME.equals(routeStrategy)) {
+             for (final String propName : allDynamicProps) {
+                 newRelationships.add(new 
Relationship.Builder().name(propName).build());
+             }
+         } else {
+             newRelationships.add(REL_MATCH);
+         }
+ 
+         newRelationships.add(REL_NO_MATCH);
+         this.relationships.set(newRelationships);
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
+         for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+             if (!descriptor.isDynamic()) {
+                 continue;
+             }
+ 
+             propertyMap.put(new 
Relationship.Builder().name(descriptor.getName()).build(), 
context.getProperty(descriptor));
+         }
+ 
+         final Set<Relationship> matchingRelationships = new HashSet<>();
+         for (final Map.Entry<Relationship, PropertyValue> entry : 
propertyMap.entrySet()) {
+             final PropertyValue value = entry.getValue();
+ 
+             final boolean matches = 
value.evaluateAttributeExpressions(flowFile).asBoolean();
+             if (matches) {
+                 matchingRelationships.add(entry.getKey());
+             }
+         }
+ 
+         final Set<Relationship> destinationRelationships = new HashSet<>();
+         switch (context.getProperty(ROUTE_STRATEGY).getValue()) {
+             case routeAllMatchValue:
+                 if (matchingRelationships.size() == propertyMap.size()) {
+                     destinationRelationships.add(REL_MATCH);
+                 } else {
+                     destinationRelationships.add(REL_NO_MATCH);
+                 }
+                 break;
+             case routeAnyMatches:
+                 if (matchingRelationships.isEmpty()) {
+                     destinationRelationships.add(REL_NO_MATCH);
+                 } else {
+                     destinationRelationships.add(REL_MATCH);
+                 }
+                 break;
+             case routePropertyNameValue:
+             default:
+                 destinationRelationships.addAll(matchingRelationships);
+                 break;
+         }
+ 
+         if (destinationRelationships.isEmpty()) {
+             logger.info(this + " routing " + flowFile + " to unmatched");
+             flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, 
REL_NO_MATCH.getName());
+             session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
+             session.transfer(flowFile, REL_NO_MATCH);
+         } else {
+             final Iterator<Relationship> relationshipNameIterator = 
destinationRelationships.iterator();
+             final Relationship firstRelationship = 
relationshipNameIterator.next();
+             final Map<Relationship, FlowFile> transferMap = new HashMap<>();
+             final Set<FlowFile> clones = new HashSet<>();
+ 
+             // make all the clones for any remaining relationships
+             while (relationshipNameIterator.hasNext()) {
+                 final Relationship relationship = 
relationshipNameIterator.next();
+                 final FlowFile cloneFlowFile = session.clone(flowFile);
+                 clones.add(cloneFlowFile);
+                 transferMap.put(relationship, cloneFlowFile);
+             }
+ 
+             // now transfer any clones generated
+             for (final Map.Entry<Relationship, FlowFile> entry : 
transferMap.entrySet()) {
+                 logger.info(this + " cloned " + flowFile + " into " + 
entry.getValue() + " and routing clone to relationship " + entry.getKey());
+                 FlowFile updatedFlowFile = 
session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, 
entry.getKey().getName());
+                 session.getProvenanceReporter().route(updatedFlowFile, 
entry.getKey());
+                 session.transfer(updatedFlowFile, entry.getKey());
+             }
+ 
+             //now transfer the original flow file
+             logger.info("Routing {} to {}", new Object[]{flowFile, 
firstRelationship});
+             session.getProvenanceReporter().route(flowFile, 
firstRelationship);
+             flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, 
firstRelationship.getName());
+             session.transfer(flowFile, firstRelationship);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
index 0000000,cb3cff2..3e581d2
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
@@@ -1,0 -1,232 +1,232 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.IntegerHolder;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"route", "content", "regex", "regular expression", "regexp"})
+ @CapabilityDescription("Applies Regular Expressions to the content of a 
FlowFile and routes a copy of the FlowFile to each "
+         + "destination whose Regular Expression matches. Regular Expressions 
are added as User-Defined Properties where the name "
+         + "of the property is the name of the relationship and the value is a 
Regular Expression to match against the FlowFile "
+         + "content. User-Defined properties do support the Attribute 
Expression Language, but the results are interpreted as "
+         + "literal values, not Regular Expressions")
+ public class RouteOnContent extends AbstractProcessor {
+ 
+     public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnContent.Route";
+ 
+     public static final String MATCH_ALL = "content must match exactly";
+     public static final String MATCH_SUBSEQUENCE = "content must contain 
match";
+ 
+     public static final PropertyDescriptor BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Content Buffer Size")
+             .description("Specifies the maximum amount of data to buffer in 
order to apply the regular expressions. If the size of the FlowFile exceeds 
this value, any amount of this value will be ignored")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .build();
+     public static final PropertyDescriptor MATCH_REQUIREMENT = new 
PropertyDescriptor.Builder()
+             .name("Match Requirement")
+             .description("Specifies whether the entire content of the file 
must match the regular expression exactly, or if any part of the file (up to 
Content Buffer Size) can contain the regular expression in order to be 
considered a match")
+             .required(true)
+             .allowableValues(MATCH_ALL, MATCH_SUBSEQUENCE)
+             .defaultValue(MATCH_ALL)
+             .build();
+     public static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+             .name("Character Set")
+             .description("The Character Set in which the file is encoded")
+             .required(true)
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .defaultValue("UTF-8")
+             .build();
+ 
+     public static final Relationship REL_NO_MATCH = new 
Relationship.Builder().name("unmatched")
+             .description("FlowFiles that do not match any of the 
user-supplied regular expressions will be routed to this relationship").build();
+ 
+     private final AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_NO_MATCH);
+         this.relationships.set(Collections.unmodifiableSet(relationships));
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(MATCH_REQUIREMENT);
+         properties.add(CHARACTER_SET);
+         properties.add(BUFFER_SIZE);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships.get();
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+         if (propertyDescriptorName.equals(REL_NO_MATCH.getName())) {
+             return null;
+         }
+ 
+         return new PropertyDescriptor.Builder()
+                 .required(false)
+                 .name(propertyDescriptorName)
+                 .addValidator(StandardValidators.createRegexValidator(0, 
Integer.MAX_VALUE, true))
+                 .dynamic(true)
+                 .expressionLanguageSupported(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+         if (descriptor.isDynamic()) {
+             final Set<Relationship> relationships = new 
HashSet<>(this.relationships.get());
+             final Relationship relationship = new 
Relationship.Builder().name(descriptor.getName()).build();
+ 
+             if (newValue == null) {
+                 relationships.remove(relationship);
+             } else {
+                 relationships.add(relationship);
+             }
+ 
+             this.relationships.set(relationships);
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         final List<FlowFile> flowFiles = session.get(1);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final AttributeValueDecorator quoteDecorator = new 
AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return (attributeValue == null) ? null : 
Pattern.quote(attributeValue);
+             }
+         };
+ 
+         final Map<FlowFile, Set<Relationship>> flowFileDestinationMap = new 
HashMap<>();
+         final ProcessorLog logger = getLogger();
+ 
+         final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+         final byte[] buffer = new 
byte[context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue()];
+         for (final FlowFile flowFile : flowFiles) {
+             final Set<Relationship> destinations = new HashSet<>();
+             flowFileDestinationMap.put(flowFile, destinations);
+ 
+             final IntegerHolder bufferedByteCount = new IntegerHolder(0);
+             session.read(flowFile, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream in) throws IOException {
+                     bufferedByteCount.set(StreamUtils.fillBuffer(in, buffer, 
false));
+                 }
+             });
+ 
+             final String contentString = new String(buffer, 0, 
bufferedByteCount.get(), charset);
+ 
+             for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+                 if (!descriptor.isDynamic()) {
+                     continue;
+                 }
+ 
+                 final String regex = 
context.getProperty(descriptor).evaluateAttributeExpressions(flowFile, 
quoteDecorator).getValue();
+                 final Pattern pattern = Pattern.compile(regex);
+                 final boolean matches;
+                 if 
(context.getProperty(MATCH_REQUIREMENT).getValue().equalsIgnoreCase(MATCH_ALL)) 
{
+                     matches = pattern.matcher(contentString).matches();
+                 } else {
+                     matches = pattern.matcher(contentString).find();
+                 }
+ 
+                 if (matches) {
+                     final Relationship relationship = new 
Relationship.Builder().name(descriptor.getName()).build();
+                     destinations.add(relationship);
+                 }
+             }
+         }
+ 
+         for (final Map.Entry<FlowFile, Set<Relationship>> entry : 
flowFileDestinationMap.entrySet()) {
+             FlowFile flowFile = entry.getKey();
+             final Set<Relationship> destinations = entry.getValue();
+ 
+             if (destinations.isEmpty()) {
+                 flowFile = session.putAttribute(flowFile, 
ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
+                 session.transfer(flowFile, REL_NO_MATCH);
+                 session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
+                 logger.info("Routing {} to 'unmatched'", new 
Object[]{flowFile});
+             } else {
+                 final Relationship firstRelationship = 
destinations.iterator().next();
+                 destinations.remove(firstRelationship);
+ 
+                 for (final Relationship relationship : destinations) {
+                     FlowFile clone = session.clone(flowFile);
+                     clone = session.putAttribute(clone, ROUTE_ATTRIBUTE_KEY, 
relationship.getName());
+                     session.getProvenanceReporter().route(clone, 
relationship);
+                     session.transfer(clone, relationship);
+                     logger.info("Cloning {} to {} and routing clone to {}", 
new Object[]{flowFile, clone, relationship});
+                 }
+ 
+                 flowFile = session.putAttribute(flowFile, 
ROUTE_ATTRIBUTE_KEY, firstRelationship.getName());
+                 session.getProvenanceReporter().route(flowFile, 
firstRelationship);
+                 session.transfer(flowFile, firstRelationship);
+                 logger.info("Routing {} to {}", new Object[]{flowFile, 
firstRelationship});
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
index 0000000,df13c66..6d48d02
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
@@@ -1,0 -1,229 +1,229 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.nio.file.Paths;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+ import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"scan", "attributes", "search", "lookup"})
+ @CapabilityDescription("Scans the specified attributes of FlowFiles, checking 
to see if any of their values are "
+         + "present within the specified dictionary of terms")
+ public class ScanAttribute extends AbstractProcessor {
+ 
+     public static final String MATCH_CRITERIA_ALL = "All Must Match";
+     public static final String MATCH_CRITERIA_ANY = "At Least 1 Must Match";
+ 
+     public static final PropertyDescriptor MATCHING_CRITERIA = new 
PropertyDescriptor.Builder()
+             .name("Match Criteria")
+             .description("If set to All Must Match, then FlowFiles will be 
routed to 'matched' only if all specified "
+                     + "attributes' values are found in the dictionary. If set 
to At Least 1 Must Match, FlowFiles will "
+                     + "be routed to 'matched' if any attribute specified is 
found in the dictionary")
+             .required(true)
+             .allowableValues(MATCH_CRITERIA_ANY, MATCH_CRITERIA_ALL)
+             .defaultValue(MATCH_CRITERIA_ANY)
+             .build();
+     public static final PropertyDescriptor ATTRIBUTE_PATTERN = new 
PropertyDescriptor.Builder()
+             .name("Attribute Pattern")
+             .description("Regular Expression that specifies the names of 
attributes whose values will be matched against the terms in the dictionary")
+             .required(true)
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .defaultValue(".*")
+             .build();
+     public static final PropertyDescriptor DICTIONARY_FILE = new 
PropertyDescriptor.Builder()
+             .name("Dictionary File")
+             .description("A new-line-delimited text file that includes the 
terms that should trigger a match. Empty lines are ignored.")
+             .required(true)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor DICTIONARY_FILTER = new 
PropertyDescriptor.Builder()
+             .name("Dictionary Filter Pattern")
+             .description("A Regular Expression that will be applied to each 
line in the dictionary file. If the regular expression does not match the line, 
the line will not be included in the list of terms to search for. If a Matching 
Group is specified, only the portion of the term that matches that Matching 
Group will be used instead of the entire term. If not specified, all terms in 
the dictionary will be used and each term will consist of the text of the 
entire line in the file")
+             .required(false)
+             .addValidator(StandardValidators.createRegexValidator(0, 1, 
false))
+             .defaultValue(null)
+             .build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     private volatile Pattern dictionaryFilterPattern = null;
+     private volatile Pattern attributePattern = null;
+     private volatile Set<String> dictionaryTerms = null;
+     private volatile SynchronousFileWatcher fileWatcher = null;
+ 
+     public static final Relationship REL_MATCHED = new 
Relationship.Builder().name("matched").description("FlowFiles whose attributes 
are found in the dictionary will be routed to this relationship").build();
+     public static final Relationship REL_UNMATCHED = new 
Relationship.Builder().name("unmatched").description("FlowFiles whose 
attributes are not found in the dictionary will be routed to this 
relationship").build();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(DICTIONARY_FILE);
+         properties.add(ATTRIBUTE_PATTERN);
+         properties.add(MATCHING_CRITERIA);
+         properties.add(DICTIONARY_FILTER);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_MATCHED);
+         relationships.add(REL_UNMATCHED);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(final ProcessContext context) throws IOException {
+         final String filterRegex = 
context.getProperty(DICTIONARY_FILTER).getValue();
+         this.dictionaryFilterPattern = (filterRegex == null) ? null : 
Pattern.compile(filterRegex);
+ 
+         final String attributeRegex = 
context.getProperty(ATTRIBUTE_PATTERN).getValue();
+         this.attributePattern = (attributeRegex.equals(".*")) ? null : 
Pattern.compile(attributeRegex);
+ 
+         this.dictionaryTerms = createDictionary(context);
+         this.fileWatcher = new 
SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()),
 new LastModifiedMonitor(), 1000L);
+     }
+ 
+     private Set<String> createDictionary(final ProcessContext context) throws 
IOException {
+         final Set<String> terms = new HashSet<>();
+ 
+         final File file = new 
File(context.getProperty(DICTIONARY_FILE).getValue());
+         try (final InputStream fis = new FileInputStream(file);
+                 final BufferedReader reader = new BufferedReader(new 
InputStreamReader(fis))) {
+ 
+             String line;
+             while ((line = reader.readLine()) != null) {
+                 if (line.trim().isEmpty()) {
+                     continue;
+                 }
+ 
+                 String matchingTerm = line;
+                 if (dictionaryFilterPattern != null) {
+                     final Matcher matcher = 
dictionaryFilterPattern.matcher(line);
+                     if (!matcher.matches()) {
+                         continue;
+                     }
+ 
+                     // Determine if we should use the entire line or only a 
part, depending on whether or not
+                     // a Matching Group was specified in the regex.
+                     if (matcher.groupCount() == 1) {
+                         matchingTerm = matcher.group(1);
+                     } else {
+                         matchingTerm = line;
+                     }
+                 }
+ 
+                 terms.add(matchingTerm);
+             }
+         }
+ 
+         return Collections.unmodifiableSet(terms);
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         final List<FlowFile> flowFiles = session.get(50);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         try {
+             if (fileWatcher.checkAndReset()) {
+                 this.dictionaryTerms = createDictionary(context);
+             }
+         } catch (final IOException e) {
+             logger.error("Unable to reload dictionary due to {}", e);
+         }
+ 
+         final boolean matchAll = 
context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL);
+ 
+         for (final FlowFile flowFile : flowFiles) {
+             final boolean matched = matchAll ? allMatch(flowFile, 
attributePattern, dictionaryTerms) : anyMatch(flowFile, attributePattern, 
dictionaryTerms);
+             final Relationship relationship = matched ? REL_MATCHED : 
REL_UNMATCHED;
+             session.getProvenanceReporter().route(flowFile, relationship);
+             session.transfer(flowFile, relationship);
+             logger.info("Transferred {} to {}", new Object[]{flowFile, 
relationship});
+         }
+     }
+ 
+     private boolean allMatch(final FlowFile flowFile, final Pattern 
attributePattern, final Set<String> dictionary) {
+         for (final Map.Entry<String, String> entry : 
flowFile.getAttributes().entrySet()) {
+             if (attributePattern == null || 
attributePattern.matcher(entry.getKey()).matches()) {
+                 if (!dictionary.contains(entry.getValue())) {
+                     return false;
+                 }
+             }
+         }
+ 
+         return true;
+     }
+ 
+     private boolean anyMatch(final FlowFile flowFile, final Pattern 
attributePattern, final Set<String> dictionary) {
+         for (final Map.Entry<String, String> entry : 
flowFile.getAttributes().entrySet()) {
+             if (attributePattern == null || 
attributePattern.matcher(entry.getKey()).matches()) {
+                 if (dictionary.contains(entry.getValue())) {
+                     return true;
+                 }
+             }
+         }
+ 
+         return false;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
index 0000000,9f53469..d9f2034
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
@@@ -1,0 -1,292 +1,292 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedReader;
+ import java.io.Closeable;
+ import java.io.DataInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.nio.charset.Charset;
+ import java.nio.file.Files;
+ import java.nio.file.Paths;
+ import java.nio.file.StandardOpenOption;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+ import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.nifi.util.search.Search;
+ import org.apache.nifi.util.search.SearchTerm;
+ import org.apache.nifi.util.search.ahocorasick.AhoCorasick;
+ import org.apache.nifi.util.search.ahocorasick.SearchState;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"aho-corasick", "scan", "content", "byte sequence", "search", "find", 
"dictionary"})
+ @CapabilityDescription("Scans the content of FlowFiles for terms that are 
found in a user-supplied dictionary. If a term is matched, the UTF-8 encoded 
version of the term will be added to the FlowFile using the 'matching.term' 
attribute")
+ public class ScanContent extends AbstractProcessor {
+ 
+     public static final String TEXT_ENCODING = "text";
+     public static final String BINARY_ENCODING = "binary";
+     public static final String MATCH_ATTRIBUTE_KEY = "matching.term";
+ 
+     public static final PropertyDescriptor DICTIONARY = new 
PropertyDescriptor.Builder()
+             .name("Dictionary File")
+             .description("The filename of the terms dictionary")
+             .required(true)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor DICTIONARY_ENCODING = new 
PropertyDescriptor.Builder()
+             .name("Dictionary Encoding")
+             .description("Indicates how the dictionary is encoded. If 'text', 
dictionary terms are new-line delimited and UTF-8 encoded; if 'binary', 
dictionary terms are denoted by a 4-byte integer indicating the term length 
followed by the term itself")
+             .required(true)
+             .allowableValues(TEXT_ENCODING, BINARY_ENCODING)
+             .defaultValue(TEXT_ENCODING)
+             .build();
+ 
+     public static final Relationship REL_MATCH = new 
Relationship.Builder().name("matched").description("FlowFiles that match at 
least one term in the dictionary are routed to this relationship").build();
+     public static final Relationship REL_NO_MATCH = new 
Relationship.Builder().name("unmatched").description("FlowFiles that do not 
match any term in the dictionary are routed to this relationship").build();
+ 
+     public static final Charset UTF8 = Charset.forName("UTF-8");
+ 
+     private final AtomicReference<SynchronousFileWatcher> fileWatcherRef = 
new AtomicReference<>();
+     private final AtomicReference<Search<byte[]>> searchRef = new 
AtomicReference<>();
+     private final ReentrantLock dictionaryUpdateLock = new ReentrantLock();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(DICTIONARY);
+         properties.add(DICTIONARY_ENCODING);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_MATCH);
+         relationships.add(REL_NO_MATCH);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+         if (descriptor.equals(DICTIONARY)) {
+             fileWatcherRef.set(new 
SynchronousFileWatcher(Paths.get(newValue), new LastModifiedMonitor(), 60000L));
+         }
+     }
+ 
+     private boolean reloadDictionary(final ProcessContext context, final 
boolean force, final ProcessorLog logger) throws IOException {
+         boolean obtainedLock;
+         if (force) {
+             dictionaryUpdateLock.lock();
+             obtainedLock = true;
+         } else {
+             obtainedLock = dictionaryUpdateLock.tryLock();
+         }
+ 
+         if (obtainedLock) {
+             try {
+                 final Search<byte[]> search = new AhoCorasick<>();
+                 final Set<SearchTerm<byte[]>> terms = new HashSet<>();
+ 
+                 final InputStream inStream = 
Files.newInputStream(Paths.get(context.getProperty(DICTIONARY).getValue()), 
StandardOpenOption.READ);
+ 
+                 final TermLoader termLoader;
+                 if 
(context.getProperty(DICTIONARY_ENCODING).getValue().equalsIgnoreCase(TEXT_ENCODING))
 {
+                     termLoader = new TextualTermLoader(inStream);
+                 } else {
+                     termLoader = new BinaryTermLoader(inStream);
+                 }
+ 
+                 try {
+                     SearchTerm<byte[]> term;
+                     while ((term = termLoader.nextTerm()) != null) {
+                         terms.add(term);
+                     }
+ 
+                     search.initializeDictionary(terms);
+                     searchRef.set(search);
+                     logger.info("Loaded search dictionary from {}", new 
Object[]{context.getProperty(DICTIONARY).getValue()});
+                     return true;
+                 } finally {
+                     termLoader.close();
+                 }
+             } finally {
+                 dictionaryUpdateLock.unlock();
+             }
+         } else {
+             return false;
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+         final SynchronousFileWatcher fileWatcher = fileWatcherRef.get();
+         try {
+             if (fileWatcher.checkAndReset()) {
+                 reloadDictionary(context, true, logger);
+             }
+         } catch (final IOException e) {
+             throw new ProcessException(e);
+         }
+ 
+         Search<byte[]> search = searchRef.get();
+         try {
+             if (search == null) {
+                 if (reloadDictionary(context, false, logger)) {
+                     search = searchRef.get();
+                 }
+             }
+         } catch (final IOException e) {
+             throw new ProcessException(e);
+         }
+ 
+         if (search == null) {
+             return;
+         }
+ 
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final Search<byte[]> finalSearch = search;
+         final ObjectHolder<SearchTerm<byte[]>> termRef = new 
ObjectHolder<>(null);
+         termRef.set(null);
+ 
+         session.read(flowFile, new InputStreamCallback() {
+             @Override
+             public void process(final InputStream rawIn) throws IOException {
+                 try (final InputStream in = new BufferedInputStream(rawIn)) {
+                     final SearchState<byte[]> searchResult = 
finalSearch.search(in, false);
+                     if (searchResult.foundMatch()) {
+                         
termRef.set(searchResult.getResults().keySet().iterator().next());
+                     }
+                 }
+             }
+         });
+ 
+         final SearchTerm<byte[]> matchingTerm = termRef.get();
+         if (matchingTerm == null) {
+             logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
+             session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
+             session.transfer(flowFile, REL_NO_MATCH);
+         } else {
+             final String matchingTermString = matchingTerm.toString(UTF8);
+             logger.info("Routing {} to 'matched' because it matched term {}", 
new Object[]{flowFile, matchingTermString});
+             flowFile = session.putAttribute(flowFile, MATCH_ATTRIBUTE_KEY, 
matchingTermString);
+             session.getProvenanceReporter().route(flowFile, REL_MATCH);
+             session.transfer(flowFile, REL_MATCH);
+         }
+     }
+ 
+     private static interface TermLoader extends Closeable {
+ 
+         SearchTerm<byte[]> nextTerm() throws IOException;
+     }
+ 
+     private static class TextualTermLoader implements TermLoader {
+ 
+         private final BufferedReader reader;
+ 
+         public TextualTermLoader(final InputStream inStream) {
+             this.reader = new BufferedReader(new InputStreamReader(inStream));
+         }
+ 
+         @Override
+         public SearchTerm<byte[]> nextTerm() throws IOException {
+             final String nextLine = reader.readLine();
+             if (nextLine == null) {
+                 return null;
+             }
+             return new SearchTerm<>(nextLine.getBytes("UTF-8"));
+         }
+ 
+         @Override
+         public void close() throws IOException {
+             this.reader.close();
+         }
+     }
+ 
+     private static class BinaryTermLoader implements TermLoader {
+ 
+         private final DataInputStream inStream;
+ 
+         public BinaryTermLoader(final InputStream inStream) {
+             this.inStream = new DataInputStream(new 
BufferedInputStream(inStream));
+         }
+ 
+         @Override
+         public SearchTerm<byte[]> nextTerm() throws IOException {
+             inStream.mark(1);
+             final int nextByte = inStream.read();
+             if (nextByte == -1) {
+                 return null;
+             }
+ 
+             inStream.reset();
+             final int termLength = inStream.readInt();
+             final byte[] term = new byte[termLength];
+             inStream.readFully(term);
+ 
+             return new SearchTerm<>(term);
+         }
+ 
+         @Override
+         public void close() throws IOException {
+             this.inStream.close();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index 0000000,1df4de6..dfdd401
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@@ -1,0 -1,163 +1,163 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"segment", "split"})
+ @CapabilityDescription("Segments a FlowFile into multiple smaller segments on 
byte boundaries. Each segment is given the following attributes: "
+         + "fragment.identifier, fragment.index, fragment.count, 
segment.original.filename; these attributes can then be used by the "
+         + "MergeContent processor in order to reconstitute the original 
FlowFile")
+ public class SegmentContent extends AbstractProcessor {
+ 
+     public static final String SEGMENT_ID = "segment.identifier";
+     public static final String SEGMENT_INDEX = "segment.index";
+     public static final String SEGMENT_COUNT = "segment.count";
+     public static final String SEGMENT_ORIGINAL_FILENAME = 
"segment.original.filename";
+ 
+     public static final String FRAGMENT_ID = "fragment.identifier";
+     public static final String FRAGMENT_INDEX = "fragment.index";
+     public static final String FRAGMENT_COUNT = "fragment.count";
+ 
+     public static final PropertyDescriptor SIZE = new 
PropertyDescriptor.Builder()
+             .name("Segment Size")
+             .description("The maximum data size for each segment")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .required(true)
+             .build();
+ 
+     public static final Relationship REL_SEGMENTS = new 
Relationship.Builder().name("segments").description("All segments will be sent 
to this relationship. If the file was small enough that it was not segmented, a 
copy of the original is sent to this relationship as well as original").build();
+     public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original").description("The original FlowFile will 
be sent to this relationship").build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> propertyDescriptors;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SEGMENTS);
+         relationships.add(REL_ORIGINAL);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(SIZE);
+         this.propertyDescriptors = Collections.unmodifiableList(descriptors);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return propertyDescriptors;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         try {
+             final String segmentId = UUID.randomUUID().toString();
+             final long segmentSize = 
context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
+ 
+             final String originalFileName = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ 
+             if (flowFile.getSize() <= segmentSize) {
+                 flowFile = session.putAttribute(flowFile, SEGMENT_ID, 
segmentId);
+                 flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
+                 flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
+                 flowFile = session.putAttribute(flowFile, 
SEGMENT_ORIGINAL_FILENAME, originalFileName);
+ 
+                 flowFile = session.putAttribute(flowFile, FRAGMENT_ID, 
segmentId);
+                 flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, 
"1");
+                 flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, 
"1");
+ 
+                 FlowFile clone = session.clone(flowFile);
+                 session.transfer(flowFile, REL_ORIGINAL);
+                 session.transfer(clone, REL_SEGMENTS);
+                 return;
+             }
+ 
+             int totalSegments = (int) (flowFile.getSize() / segmentSize);
+             if (totalSegments * segmentSize < flowFile.getSize()) {
+                 totalSegments++;
+             }
+ 
+             final Map<String, String> segmentAttributes = new HashMap<>();
+             segmentAttributes.put(SEGMENT_ID, segmentId);
+             segmentAttributes.put(SEGMENT_COUNT, 
String.valueOf(totalSegments));
+             segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, 
originalFileName);
+ 
+             segmentAttributes.put(FRAGMENT_ID, segmentId);
+             segmentAttributes.put(FRAGMENT_COUNT, 
String.valueOf(totalSegments));
+ 
+             final Set<FlowFile> segmentSet = new HashSet<>();
+             for (int i = 1; i <= totalSegments; i++) {
+                 final long segmentOffset = segmentSize * (i - 1);
+                 FlowFile segment = session.clone(flowFile, segmentOffset, 
Math.min(segmentSize, flowFile.getSize() - segmentOffset));
+                 segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
+                 segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
+                 segment = session.putAllAttributes(segment, 
segmentAttributes);
+                 segmentSet.add(segment);
+             }
+ 
+             session.transfer(segmentSet, REL_SEGMENTS);
+             session.transfer(flowFile, REL_ORIGINAL);
+ 
+             if (totalSegments <= 10) {
+                 getLogger().info("Segmented {} into {} segments: {}", new 
Object[]{flowFile, totalSegments, segmentSet});
+             } else {
+                 getLogger().info("Segmented {} into {} segments", new 
Object[]{flowFile, totalSegments});
+             }
+         } catch (final Exception e) {
+             throw new ProcessException(e);
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
index 0000000,7e67c01..8c00a7e
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
@@@ -1,0 -1,260 +1,260 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.util.NaiveSearchRingBuffer;
+ import org.apache.nifi.util.Tuple;
+ 
+ import org.apache.commons.codec.binary.Hex;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"content", "split", "binary"})
+ @CapabilityDescription("Splits incoming FlowFiles by a specified byte 
sequence")
+ public class SplitContent extends AbstractProcessor {
+ 
+     // attribute keys
+     public static final String FRAGMENT_ID = "fragment.identifier";
+     public static final String FRAGMENT_INDEX = "fragment.index";
+     public static final String FRAGMENT_COUNT = "fragment.count";
+     public static final String SEGMENT_ORIGINAL_FILENAME = 
"segment.original.filename";
+ 
+     public static final PropertyDescriptor BYTE_SEQUENCE = new 
PropertyDescriptor.Builder()
+             .name("Byte Sequence")
+             .description("A hex representation of bytes to look for and upon 
which to split the source file into separate files")
+             .addValidator(new HexStringPropertyValidator())
+             .required(true)
+             .build();
+     public static final PropertyDescriptor KEEP_SEQUENCE = new 
PropertyDescriptor.Builder()
+             .name("Keep Byte Sequence")
+             .description("Determines whether or not the Byte Sequence should 
be included at the end of each Split")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final Relationship REL_SPLITS = new Relationship.Builder()
+             .name("splits")
+             .description("All Splits will be routed to the splits 
relationship")
+             .build();
+     public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+             .name("original")
+             .description("The original file")
+             .build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     private final AtomicReference<byte[]> byteSequence = new 
AtomicReference<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SPLITS);
+         relationships.add(REL_ORIGINAL);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(BYTE_SEQUENCE);
+         properties.add(KEEP_SEQUENCE);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+         if (descriptor.equals(BYTE_SEQUENCE)) {
+             try {
+                 this.byteSequence.set(Hex.decodeHex(newValue.toCharArray()));
+             } catch (final Exception e) {
+                 this.byteSequence.set(null);
+             }
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         final FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final boolean keepSequence = 
context.getProperty(KEEP_SEQUENCE).asBoolean();
+         final byte[] byteSequence = this.byteSequence.get();
+         if (byteSequence == null) {   // should never happen. But just in 
case...
+             logger.error("{} Unable to obtain Byte Sequence", new 
Object[]{this});
+             session.rollback();
+             return;
+         }
+ 
+         final List<Tuple<Long, Long>> splits = new ArrayList<>();
+ 
+         final NaiveSearchRingBuffer buffer = new 
NaiveSearchRingBuffer(byteSequence);
+         session.read(flowFile, new InputStreamCallback() {
+             @Override
+             public void process(final InputStream rawIn) throws IOException {
+                 long bytesRead = 0L;
+                 long startOffset = 0L;
+ 
+                 try (final InputStream in = new BufferedInputStream(rawIn)) {
+                     while (true) {
+                         final int nextByte = in.read();
+                         if (nextByte == -1) {
+                             return;
+                         }
+ 
+                         bytesRead++;
+                         boolean matched = buffer.addAndCompare((byte) 
(nextByte & 0xFF));
+                         if (matched) {
+                             final long splitLength;
+ 
+                             if (keepSequence) {
+                                 splitLength = bytesRead - startOffset;
+                             } else {
+                                 splitLength = bytesRead - startOffset - 
byteSequence.length;
+                             }
+ 
+                             splits.add(new Tuple<>(startOffset, splitLength));
+                             startOffset = bytesRead;
+                             buffer.clear();
+                         }
+                     }
+                 }
+             }
+         });
+ 
+         long lastOffsetPlusSize = -1L;
+         if (splits.isEmpty()) {
+             FlowFile clone = session.clone(flowFile);
+             session.transfer(flowFile, REL_ORIGINAL);
+             session.transfer(clone, REL_SPLITS);
+             logger.info("Found no match for {}; transferring original 
'original' and transferring clone {} to 'splits'", new Object[]{flowFile, 
clone});
+             return;
+         }
+ 
+         final ArrayList<FlowFile> splitList = new ArrayList<>();
+         for (final Tuple<Long, Long> tuple : splits) {
+             long offset = tuple.getKey();
+             long size = tuple.getValue();
+             if (size > 0) {
+                 FlowFile split = session.clone(flowFile, offset, size);
+                 splitList.add(split);
+             }
+ 
+             lastOffsetPlusSize = offset + size;
+         }
+ 
+         long finalSplitOffset = lastOffsetPlusSize;
+         if (!keepSequence) {
+             finalSplitOffset += byteSequence.length;
+         }
+         if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) {
+             FlowFile finalSplit = session.clone(flowFile, finalSplitOffset, 
flowFile.getSize() - finalSplitOffset);
+             splitList.add(finalSplit);
+         }
+ 
+         finishFragmentAttributes(session, flowFile, splitList);
+         session.transfer(splitList, REL_SPLITS);
+         session.transfer(flowFile, REL_ORIGINAL);
+ 
+         if (splitList.size() > 10) {
+             logger.info("Split {} into {} files", new Object[]{flowFile, 
splitList.size()});
+         } else {
+             logger.info("Split {} into {} files: {}", new Object[]{flowFile, 
splitList.size(), splitList});
+         }
+     }
+ 
+     /**
+      * Apply split index, count and other attributes.
+      *
+      * @param session
+      * @param source
+      * @param unpacked
+      */
+     private void finishFragmentAttributes(final ProcessSession session, final 
FlowFile source, final List<FlowFile> splits) {
+         final String originalFilename = 
source.getAttribute(CoreAttributes.FILENAME.key());
+ 
+         final String fragmentId = UUID.randomUUID().toString();
+         final ArrayList<FlowFile> newList = new ArrayList<>(splits);
+         splits.clear();
+         for (int i = 1; i <= newList.size(); i++) {
+             FlowFile ff = newList.get(i - 1);
+             final Map<String, String> attributes = new HashMap<>();
+             attributes.put(FRAGMENT_ID, fragmentId);
+             attributes.put(FRAGMENT_INDEX, String.valueOf(i));
+             attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
+             attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
+             FlowFile newFF = session.putAllAttributes(ff, attributes);
+             splits.add(newFF);
+         }
+     }
+ 
+     static class HexStringPropertyValidator implements Validator {
+ 
+         @Override
+         public ValidationResult validate(final String subject, final String 
input, final ValidationContext validationContext) {
+             try {
+                 Hex.decodeHex(input.toCharArray());
+                 return new 
ValidationResult.Builder().valid(true).input(input).subject(subject).build();
+             } catch (final Exception e) {
+                 return new 
ValidationResult.Builder().valid(false).explanation("Not a valid Hex 
String").input(input).subject(subject).build();
+             }
+         }
+     }
+ }

Reply via email to