http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
index 0000000,1fea36d..8910fdc
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java
@@@ -1,0 -1,249 +1,249 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.distributed.cache.client.Deserializer;
+ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+ import org.apache.nifi.distributed.cache.client.Serializer;
+ import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+ import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+ import org.apache.nifi.expression.AttributeExpression.ResultType;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @EventDriven
+ @SupportsBatching
+ @Tags({"experimental", "hash", "dupe", "duplicate", "dedupe"})
+ @CapabilityDescription("Caches a value, computed from FlowFile attributes, 
for each incoming FlowFile and determines if the cached value has already been 
seen. "
+         + "If so, routes the FlowFile to 'duplicate' with an attribute named 
'original.identifier' that specifies the original FlowFile's"
+         + "\"description\", which is specified in the <FlowFile Description> 
property. If the FlowFile is not determined to be a duplicate, the Processor "
+         + "routes the FlowFile to 'non-duplicate'")
+ public class DetectDuplicate extends AbstractProcessor {
+ 
+     public static final String ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME = 
"original.flowfile.description";
+ 
+     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
+             .name("Distributed Cache Service")
+             .description("The Controller Service that is used to cache unique 
identifiers, used to determine duplicates")
+             .required(true)
+             .identifiesControllerService(DistributedMapCacheClient.class)
+             .build();
+     public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new 
PropertyDescriptor.Builder()
+             .name("Cache Entry Identifier")
+             .description(
+                     "A FlowFile attribute, or the results of an Attribute 
Expression Language statement, which will be evaluated "
+                     + "against a FlowFile in order to determine the value 
used to identify duplicates; it is this value that is cached")
+             .required(true)
+             
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
+             .defaultValue("${hash.value}")
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor FLOWFILE_DESCRIPTION = new 
PropertyDescriptor.Builder()
+             .name("FlowFile Description")
+             .description(
+                     "When a FlowFile is added to the cache, this value is 
stored along with it so that if a duplicate is found, this description of the 
original FlowFile will be added to the duplicate's \""
+                     + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME + "\" attribute")
+             .required(true)
+             
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
+             .expressionLanguageSupported(true)
+             .defaultValue("")
+             .build();
+ 
+     public static final PropertyDescriptor AGE_OFF_DURATION = new 
PropertyDescriptor.Builder()
+             .name("Age Off Duration")
+             .description("Time interval to age off cached FlowFiles")
+             .required(false)
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .build();
+ 
+     public static final Relationship REL_DUPLICATE = new 
Relationship.Builder().name("duplicate")
+             .description("If a FlowFile has been detected to be a duplicate, 
it will be routed to this relationship").build();
+     public static final Relationship REL_NON_DUPLICATE = new 
Relationship.Builder().name("non-duplicate")
+             .description("If a FlowFile's Cache Entry Identifier was not 
found in the cache, it will be routed to this relationship").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+             .description("If unable to communicate with the cache, the 
FlowFile will be penalized and routed to this relationship").build();
+     private final Set<Relationship> relationships;
+ 
+     private final Serializer<String> keySerializer = new StringSerializer();
+     private final Serializer<CacheValue> valueSerializer = new 
CacheValueSerializer();
+     private final Deserializer<CacheValue> valueDeserializer = new 
CacheValueDeserializer();
+ 
+     public DetectDuplicate() {
+         final Set<Relationship> rels = new HashSet<>();
+         rels.add(REL_DUPLICATE);
+         rels.add(REL_NON_DUPLICATE);
+         rels.add(REL_FAILURE);
+         relationships = Collections.unmodifiableSet(rels);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(CACHE_ENTRY_IDENTIFIER);
+         descriptors.add(FLOWFILE_DESCRIPTION);
+         descriptors.add(AGE_OFF_DURATION);
+         descriptors.add(DISTRIBUTED_CACHE_SERVICE);
+         return descriptors;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final String cacheKey = 
context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+         if (StringUtils.isBlank(cacheKey)) {
+             logger.error("FlowFile {} has no attribute for given Cache Entry 
Identifier", new Object[]{flowFile});
+             flowFile = session.penalize(flowFile);
+             session.transfer(flowFile, REL_FAILURE);
+             return;
+         }
+         final DistributedMapCacheClient cache = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+         final Long durationMS = 
context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long now = System.currentTimeMillis();
+ 
+         try {
+             final String flowFileDescription = 
context.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue();
+             final CacheValue cacheValue = new CacheValue(flowFileDescription, 
now);
+             final CacheValue originalCacheValue = 
cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, 
valueDeserializer);
+             boolean duplicate = originalCacheValue != null;
+             if (duplicate && durationMS != null && (now >= 
originalCacheValue.getEntryTimeMS() + durationMS)) {
+                 boolean status = cache.remove(cacheKey, keySerializer);
+                 logger.debug("Removal of expired cached entry with key {} 
returned {}", new Object[]{cacheKey, status});
+                 // this should typically result in duplicate being 
false...but, better safe than sorry
+                 duplicate = !cache.putIfAbsent(cacheKey, cacheValue, 
keySerializer, valueSerializer);
+             }
+             if (duplicate) {
+                 session.getProvenanceReporter().route(flowFile, 
REL_DUPLICATE, "Duplicate of: " + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME);
+                 String originalFlowFileDescription = 
originalCacheValue.getDescription();
+                 flowFile = session.putAttribute(flowFile, 
ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME, originalFlowFileDescription);
+                 session.transfer(flowFile, REL_DUPLICATE);
+                 logger.info("Found {} to be a duplicate of FlowFile with 
description {}", new Object[]{flowFile, originalFlowFileDescription});
+                 session.adjustCounter("Duplicates Detected", 1L, false);
+             } else {
+                 session.getProvenanceReporter().route(flowFile, 
REL_NON_DUPLICATE);
+                 session.transfer(flowFile, REL_NON_DUPLICATE);
+                 logger.info("Could not find a duplicate entry in cache for 
{}; routing to non-duplicate", new Object[]{flowFile});
+                 session.adjustCounter("Non-Duplicate Files Processed", 1L, 
false);
+             }
+         } catch (final IOException e) {
+             flowFile = session.penalize(flowFile);
+             session.transfer(flowFile, REL_FAILURE);
+             logger.error("Unable to communicate with cache when processing {} 
due to {}", new Object[]{flowFile, e});
+         }
+     }
+ 
+     private static class CacheValue {
+ 
+         private final String description;
+         private final long entryTimeMS;
+ 
+         public CacheValue(String description, long entryTimeMS) {
+             this.description = description;
+             this.entryTimeMS = entryTimeMS;
+         }
+ 
+         public String getDescription() {
+             return description;
+         }
+ 
+         public long getEntryTimeMS() {
+             return entryTimeMS;
+         }
+ 
+     }
+ 
+     private static class CacheValueSerializer implements 
Serializer<CacheValue> {
+ 
+         @Override
+         public void serialize(final CacheValue entry, final OutputStream out) 
throws SerializationException, IOException {
+             long time = entry.getEntryTimeMS();
+             byte[] writeBuffer = new byte[8];
+             writeBuffer[0] = (byte) (time >>> 56);
+             writeBuffer[1] = (byte) (time >>> 48);
+             writeBuffer[2] = (byte) (time >>> 40);
+             writeBuffer[3] = (byte) (time >>> 32);
+             writeBuffer[4] = (byte) (time >>> 24);
+             writeBuffer[5] = (byte) (time >>> 16);
+             writeBuffer[6] = (byte) (time >>> 8);
+             writeBuffer[7] = (byte) (time);
+             out.write(writeBuffer, 0, 8);
+             
out.write(entry.getDescription().getBytes(StandardCharsets.UTF_8));
+         }
+     }
+ 
+     private static class CacheValueDeserializer implements 
Deserializer<CacheValue> {
+ 
+         @Override
+         public CacheValue deserialize(final byte[] input) throws 
DeserializationException, IOException {
+             if (input.length == 0) {
+                 return null;
+             }
+             long time = ((long) input[0] << 56)
+                     + ((long) (input[1] & 255) << 48)
+                     + ((long) (input[2] & 255) << 40)
+                     + ((long) (input[3] & 255) << 32)
+                     + ((long) (input[4] & 255) << 24)
+                     + ((input[5] & 255) << 16)
+                     + ((input[6] & 255) << 8)
+                     + ((input[7] & 255));
+             String description = new String(input, 8, input.length - 8, 
StandardCharsets.UTF_8);
+             CacheValue value = new CacheValue(description, time);
+             return value;
+         }
+     }
+ 
+     private static class StringSerializer implements Serializer<String> {
+ 
+         @Override
+         public void serialize(final String value, final OutputStream out) 
throws SerializationException, IOException {
+             out.write(value.getBytes(StandardCharsets.UTF_8));
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 0000000,3ac55d2..8fb8ad4
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@@ -1,0 -1,498 +1,498 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.TreeSet;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.loading.LoadDistributionListener;
+ import org.apache.nifi.loading.LoadDistributionService;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
 -import 
org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @TriggerWhenAnyDestinationAvailable
+ @Tags({"distribute", "load balance", "route", "round robin", "weighted"})
+ @CapabilityDescription("Distributes FlowFiles to downstream processors based 
on a Distribution Strategy. If using the Round Robin "
+         + "strategy, the default is to assign each destination a weighting of 
1 (evenly distributed). However, optional properties"
+         + "can be added to the change this; adding a property with the name 
'5' and value '10' means that the relationship with name "
+         + "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
+ public class DistributeLoad extends AbstractProcessor {
+ 
+     public static final String STRATEGY_ROUND_ROBIN = "round robin";
+     public static final String STRATEGY_NEXT_AVAILABLE = "next available";
+     public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load 
distribution service";
+ 
+     public static final PropertyDescriptor NUM_RELATIONSHIPS = new 
PropertyDescriptor.Builder()
+             .name("Number of Relationships")
+             .description("Determines the number of Relationships to which the 
load should be distributed")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("1")
+             .build();
+     public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new 
PropertyDescriptor.Builder()
+             .name("Distribution Strategy")
+             .description(
+                     "Determines how the load will be distributed. If using 
Round Robin, will not distribute any FlowFiles unless all destinations can 
accept FlowFiles; when using Next Available, will distribute FlowFiles as long 
as at least 1 destination can accept FlowFiles.")
+             .required(true)
+             .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, 
STRATEGY_LOAD_DISTRIBUTION_SERVICE)
+             .defaultValue(STRATEGY_ROUND_ROBIN)
+             .build();
+ 
+     public static final PropertyDescriptor HOSTNAMES = new 
PropertyDescriptor.Builder()
+             .name("Hostnames")
+             .description("List of remote servers to distribute across. Each 
server must be FQDN and use either ',', ';', or [space] as a delimiter")
+             .required(true)
+             .addValidator(new Validator() {
+ 
+                 @Override
+                 public ValidationResult validate(String subject, String 
input, ValidationContext context) {
+                     ValidationResult result = new ValidationResult.Builder()
+                     .subject(subject)
+                     .valid(true)
+                     .input(input)
+                     .explanation("Good FQDNs")
+                     .build();
+                     if (null == input) {
+                         result = new ValidationResult.Builder()
+                         .subject(subject)
+                         .input(input)
+                         .valid(false)
+                         .explanation("Need to specify delimited list of 
FQDNs")
+                         .build();
+                         return result;
+                     }
+                     String[] hostNames = input.split("(?:,+|;+|\\s+)");
+                     for (String hostName : hostNames) {
+                         if (StringUtils.isNotBlank(hostName) && 
!hostName.contains(".")) {
+                             result = new ValidationResult.Builder()
+                             .subject(subject)
+                             .input(input)
+                             .valid(false)
+                             .explanation("Need a FQDN rather than a simple 
host name.")
+                             .build();
+                             return result;
+                         }
+                     }
+                     return result;
+                 }
+             })
+             .build();
+     public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE 
= new PropertyDescriptor.Builder()
+             .name("Load Distribution Service ID")
+             .description("The identifier of the Load Distribution Service")
+             .required(true)
+             .identifiesControllerService(LoadDistributionService.class)
+             .build();
+ 
+     private List<PropertyDescriptor> properties;
+     private final AtomicReference<Set<Relationship>> relationshipsRef = new 
AtomicReference<>();
+     private final AtomicReference<DistributionStrategy> strategyRef = new 
AtomicReference<DistributionStrategy>(new RoundRobinStrategy());
+     private final AtomicReference<List<Relationship>> 
weightedRelationshipListRef = new AtomicReference<>();
+     private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
+     private volatile LoadDistributionListener myListener;
+     private final AtomicBoolean doSetProps = new AtomicBoolean(true);
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(createRelationship(1));
+         relationshipsRef.set(Collections.unmodifiableSet(relationships));
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(NUM_RELATIONSHIPS);
+         properties.add(DISTRIBUTION_STRATEGY);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     private static Relationship createRelationship(final int num) {
+         return new Relationship.Builder().name(String.valueOf(num)).build();
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationshipsRef.get();
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+         if (descriptor.equals(NUM_RELATIONSHIPS)) {
+             final Set<Relationship> relationships = new HashSet<>();
+             for (int i = 1; i <= Integer.parseInt(newValue); i++) {
+                 relationships.add(createRelationship(i));
+             }
+             
this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
+         } else if (descriptor.equals(DISTRIBUTION_STRATEGY)) {
+             switch (newValue.toLowerCase()) {
+                 case STRATEGY_ROUND_ROBIN:
+                     strategyRef.set(new RoundRobinStrategy());
+                     break;
+                 case STRATEGY_NEXT_AVAILABLE:
+                     strategyRef.set(new NextAvailableStrategy());
+                     break;
+                 case STRATEGY_LOAD_DISTRIBUTION_SERVICE:
+                     strategyRef.set(new LoadDistributionStrategy());
+             }
+             doSetProps.set(true);
+             doCustomValidate.set(true);
+         }
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         if (strategyRef.get() instanceof LoadDistributionStrategy && 
doSetProps.getAndSet(false)) {
+             final List<PropertyDescriptor> props = new 
ArrayList<>(properties);
+             props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
+             props.add(HOSTNAMES);
+             this.properties = Collections.unmodifiableList(props);
+         } else if (doSetProps.getAndSet(false)) {
+             final List<PropertyDescriptor> props = new ArrayList<>();
+             props.add(NUM_RELATIONSHIPS);
+             props.add(DISTRIBUTION_STRATEGY);
+             this.properties = Collections.unmodifiableList(props);
+         }
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+         // validate that the property name is valid.
+         final int numRelationships = this.relationshipsRef.get().size();
+         try {
+             final int value = Integer.parseInt(propertyDescriptorName);
+             if (value <= 0 || value > numRelationships) {
+                 return new PropertyDescriptor.Builder().addValidator(new 
InvalidPropertyNameValidator(propertyDescriptorName))
+                         .name(propertyDescriptorName).build();
+             }
+         } catch (final NumberFormatException e) {
+             return new PropertyDescriptor.Builder().addValidator(new 
InvalidPropertyNameValidator(propertyDescriptorName))
+                     .name(propertyDescriptorName).build();
+         }
+ 
+         // validate that the property value is valid
+         return new 
PropertyDescriptor.Builder().addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                 .name(propertyDescriptorName).dynamic(true).build();
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+         Collection<ValidationResult> results = new ArrayList<>();
+         if (doCustomValidate.getAndSet(false)) {
+             String distStrat = 
validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
+             if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+                 // make sure Hostnames and Controller service are set
+                 PropertyValue propDesc = 
validationContext.getProperty(HOSTNAMES);
+                 if (null == propDesc || null == propDesc.getValue() || 
propDesc.getValue().isEmpty()) {
+                     results.add(new ValidationResult.Builder()
+                             .subject(HOSTNAMES.getName())
+                             .explanation("Must specify Hostnames when using 
'Load Distribution Strategy'")
+                             .valid(false)
+                             .build());
+                 }
+                 propDesc = 
validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
+                 if (null == propDesc || null == propDesc.getValue() || 
propDesc.getValue().isEmpty()) {
+                     results.add(new ValidationResult.Builder()
+                             
.subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
+                             .explanation("Must specify 'Load Distribution 
Service ID' when using 'Load Distribution Strategy'")
+                             .valid(false)
+                             .build());
+                 }
+                 if (results.isEmpty()) {
+                     int numRels = 
validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
+                     String hostNamesValue = 
validationContext.getProperty(HOSTNAMES).getValue();
+                     String[] hostNames = 
hostNamesValue.split("(?:,+|;+|\\s+)");
+                     int numHosts = 0;
+                     for (String hostName : hostNames) {
+                         if (StringUtils.isNotBlank(hostName)) {
+                             hostNames[numHosts++] = hostName;
+                         }
+                     }
+                     if (numHosts > numRels) {
+                         results.add(new ValidationResult.Builder()
+                                 .subject("Number of Relationships and 
Hostnames")
+                                 .explanation("Number of Relationships must be 
equal to, or greater than, the number of host names")
+                                 .valid(false)
+                                 .build());
+                     } else {
+                         // create new relationships with descriptions of 
hostname
+                         Set<Relationship> relsWithDesc = new TreeSet<>();
+                         for (int i = 0; i < numHosts; i++) {
+                             relsWithDesc.add(new 
Relationship.Builder().name(String.valueOf(i + 
1)).description(hostNames[i]).build());
+                         }
+                         // add add'l rels if configuration requires it...it 
probably shouldn't
+                         for (int i = numHosts + 1; i <= numRels; i++) {
+                             relsWithDesc.add(createRelationship(i));
+                         }
+                         
relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
+                     }
+                 }
+             }
+         }
+         return results;
+     }
+ 
+     @OnScheduled
+     public void createWeightedList(final ProcessContext context) {
+         final Map<Integer, Integer> weightings = new LinkedHashMap<>();
+ 
+         String distStrat = 
context.getProperty(DISTRIBUTION_STRATEGY).getValue();
+         if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+             String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
+             String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
+             Set<String> hostNameSet = new HashSet<>();
+             for (String hostName : hostNames) {
+                 if (StringUtils.isNotBlank(hostName)) {
+                     hostNameSet.add(hostName);
+                 }
+             }
+             LoadDistributionService svc = 
context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
+             myListener = new LoadDistributionListener() {
+ 
+                 @Override
+                 public void update(Map<String, Integer> loadInfo) {
+                     for (Relationship rel : relationshipsRef.get()) {
+                         String hostname = rel.getDescription();
+                         Integer weight = 1;
+                         if (loadInfo.containsKey(hostname)) {
+                             weight = loadInfo.get(hostname);
+                         }
+                         weightings.put(Integer.decode(rel.getName()), weight);
+                     }
+                     updateWeightedRelationships(weightings);
+                 }
+             };
+ 
+             Map<String, Integer> loadInfo = 
svc.getLoadDistribution(hostNameSet, myListener);
+             for (Relationship rel : relationshipsRef.get()) {
+                 String hostname = rel.getDescription();
+                 Integer weight = 1;
+                 if (loadInfo.containsKey(hostname)) {
+                     weight = loadInfo.get(hostname);
+                 }
+                 weightings.put(Integer.decode(rel.getName()), weight);
+             }
+ 
+         } else {
+             final int numRelationships = 
context.getProperty(NUM_RELATIONSHIPS).asInteger();
+             for (int i = 1; i <= numRelationships; i++) {
+                 weightings.put(i, 1);
+             }
+             for (final PropertyDescriptor propDesc : 
context.getProperties().keySet()) {
+                 if (!this.properties.contains(propDesc)) {
+                     final int relationship = 
Integer.parseInt(propDesc.getName());
+                     final int weighting = 
context.getProperty(propDesc).asInteger();
+                     weightings.put(relationship, weighting);
+                 }
+             }
+         }
+         updateWeightedRelationships(weightings);
+     }
+ 
+     private void updateWeightedRelationships(final Map<Integer, Integer> 
weightings) {
+         final List<Relationship> relationshipList = new ArrayList<>();
+         for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) 
{
+             final String relationshipName = String.valueOf(entry.getKey());
+             final Relationship relationship = new 
Relationship.Builder().name(relationshipName).build();
+             for (int i = 0; i < entry.getValue(); i++) {
+                 relationshipList.add(relationship);
+             }
+         }
+ 
+         
this.weightedRelationshipListRef.set(Collections.unmodifiableList(relationshipList));
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         final FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final DistributionStrategy strategy = strategyRef.get();
+         final Set<Relationship> available = 
context.getAvailableRelationships();
+         final int numRelationships = 
context.getProperty(NUM_RELATIONSHIPS).asInteger();
+         final boolean allDestinationsAvailable = (available.size() == 
numRelationships);
+         if (!allDestinationsAvailable && 
strategy.requiresAllDestinationsAvailable()) {
+             return;
+         }
+ 
+         final Relationship relationship = strategy.mapToRelationship(context, 
flowFile);
+         if (relationship == null) {
+             // can't transfer the FlowFiles. Roll back and yield
+             session.rollback();
+             context.yield();
+             return;
+         }
+ 
+         session.transfer(flowFile, relationship);
+         session.getProvenanceReporter().route(flowFile, relationship);
+     }
+ 
+     private static class InvalidPropertyNameValidator implements Validator {
+ 
+         private final String propertyName;
+ 
+         public InvalidPropertyNameValidator(final String propertyName) {
+             this.propertyName = propertyName;
+         }
+ 
+         @Override
+         public ValidationResult validate(final String subject, final String 
input, final ValidationContext validationContext) {
+             return new ValidationResult.Builder().subject("Property 
Name").input(propertyName)
+                     .explanation("Property Name must be a positive integer 
between 1 and the number of relationships (inclusive)")
+                     .valid(false)
+                     .build();
+         }
+     }
+ 
+     /**
+      * Implementations must be thread-safe.
+      */
+     private static interface DistributionStrategy {
+ 
+         /**
+          * Returns a mapping of FlowFile to Relationship or <code>null</code> 
if
+          * the needed relationships are not available to accept files.
+          *
+          * @param session
+          * @param flowFiles
+          * @return
+          */
+         Relationship mapToRelationship(ProcessContext context, FlowFile 
flowFile);
+ 
+         boolean requiresAllDestinationsAvailable();
+     }
+ 
+     private class LoadDistributionStrategy implements DistributionStrategy {
+ 
+         private final AtomicLong counter = new AtomicLong(0L);
+ 
+         @Override
+         public Relationship mapToRelationship(final ProcessContext context, 
final FlowFile flowFile) {
+             final List<Relationship> relationshipList = 
DistributeLoad.this.weightedRelationshipListRef.get();
+             final int numRelationships = relationshipList.size();
+ 
+             // create a HashSet that contains all of the available 
relationships, as calling #contains on HashSet
+             // is much faster than calling it on a List
+             boolean foundFreeRelationship = false;
+             Relationship relationship = null;
+ 
+             int attempts = 0;
+             while (!foundFreeRelationship) {
+                 final long counterValue = counter.getAndIncrement();
+                 final int idx = (int) (counterValue % numRelationships);
+                 relationship = relationshipList.get(idx);
+                 foundFreeRelationship = 
context.getAvailableRelationships().contains(relationship);
+                 if (++attempts % numRelationships == 0 && 
!foundFreeRelationship) {
+                     return null;
+                 }
+             }
+ 
+             return relationship;
+         }
+ 
+         @Override
+         public boolean requiresAllDestinationsAvailable() {
+             return false;
+         }
+ 
+     }
+ 
+     private class RoundRobinStrategy implements DistributionStrategy {
+ 
+         private final AtomicLong counter = new AtomicLong(0L);
+ 
+         @Override
+         public Relationship mapToRelationship(final ProcessContext context, 
final FlowFile flowFile) {
+             final List<Relationship> relationshipList = 
DistributeLoad.this.weightedRelationshipListRef.get();
+             final long counterValue = counter.getAndIncrement();
+             final int idx = (int) (counterValue % relationshipList.size());
+             final Relationship relationship = relationshipList.get(idx);
+             return relationship;
+         }
+ 
+         @Override
+         public boolean requiresAllDestinationsAvailable() {
+             return true;
+         }
+     }
+ 
+     private class NextAvailableStrategy implements DistributionStrategy {
+ 
+         private final AtomicLong counter = new AtomicLong(0L);
+ 
+         @Override
+         public Relationship mapToRelationship(final ProcessContext context, 
final FlowFile flowFile) {
+             final List<Relationship> relationshipList = 
DistributeLoad.this.weightedRelationshipListRef.get();
+             final int numRelationships = relationshipList.size();
+ 
+             // create a HashSet that contains all of the available 
relationships, as calling #contains on HashSet
+             // is much faster than calling it on a List
+             boolean foundFreeRelationship = false;
+             Relationship relationship = null;
+ 
+             int attempts = 0;
+             while (!foundFreeRelationship) {
+                 final long counterValue = counter.getAndIncrement();
+                 final int idx = (int) (counterValue % numRelationships);
+                 relationship = relationshipList.get(idx);
+                 foundFreeRelationship = 
context.getAvailableRelationships().contains(relationship);
+                 if (++attempts % numRelationships == 0 && 
!foundFreeRelationship) {
+                     return null;
+                 }
+             }
+ 
+             return relationship;
+         }
+ 
+         @Override
+         public boolean requiresAllDestinationsAvailable() {
+             return false;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
index 0000000,eb079bb..69cb18e
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
@@@ -1,0 -1,263 +1,263 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.security.util.EncryptionMethod;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import org.bouncycastle.jce.provider.BouncyCastleProvider;
+ 
+ import javax.crypto.*;
+ import javax.crypto.spec.PBEKeySpec;
+ import javax.crypto.spec.PBEParameterSpec;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.security.InvalidAlgorithmParameterException;
+ import java.security.InvalidKeyException;
+ import java.security.SecureRandom;
+ import java.security.Security;
+ import java.text.Normalizer;
+ import java.util.*;
+ import java.util.concurrent.TimeUnit;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"encryption", "decryption", "password", "JCE"})
+ @CapabilityDescription("Encrypts or Decrypts a FlowFile using a randomly 
generated salt")
+ public class EncryptContent extends AbstractProcessor {
+ 
+     public static final String ENCRYPT_MODE = "Encrypt";
+     public static final String DECRYPT_MODE = "Decrypt";
+     public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
+     public static final int DEFAULT_SALT_SIZE = 8;
+ 
+     public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+             .name("Mode")
+             .description("Specifies whether the content should be encrypted 
or decrypted")
+             .required(true)
+             .allowableValues(ENCRYPT_MODE, DECRYPT_MODE)
+             .defaultValue(ENCRYPT_MODE)
+             .build();
+     public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new 
PropertyDescriptor.Builder()
+             .name("Encryption Algorithm")
+             .description("The Encryption Algorithm to use")
+             .required(true)
+             .allowableValues(EncryptionMethod.values())
+             .defaultValue(EncryptionMethod.MD5_256AES.name())
+             .build();
+     public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+             .name("Password")
+             .description("The Password to use for encrypting or decrypting 
the data")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .sensitive(true)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("Any FlowFile that is 
successfully encrypted or decrypted will be routed to success").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("Any FlowFile that cannot be 
encrypted or decrypted will be routed to failure").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     static {
+         // add BouncyCastle encryption providers
+         Security.addProvider(new BouncyCastleProvider());
+     }
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(MODE);
+         properties.add(ENCRYPTION_ALGORITHM);
+         properties.add(PASSWORD);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final String method = 
context.getProperty(ENCRYPTION_ALGORITHM).getValue();
+         final EncryptionMethod encryptionMethod = 
EncryptionMethod.valueOf(method);
+         final String providerName = encryptionMethod.getProvider();
+         final String algorithm = encryptionMethod.getAlgorithm();
+ 
+         final String password = context.getProperty(PASSWORD).getValue();
+         final char[] normalizedPassword = Normalizer.normalize(password, 
Normalizer.Form.NFC).toCharArray();
+         final PBEKeySpec pbeKeySpec = new PBEKeySpec(normalizedPassword);
+ 
+         final SecureRandom secureRandom;
+         final SecretKeyFactory factory;
+         final SecretKey secretKey;
+         final Cipher cipher;
+         try {
+             secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
+             secureRandom.setSeed(System.currentTimeMillis());
+             factory = SecretKeyFactory.getInstance(algorithm, providerName);
+             secretKey = factory.generateSecret(pbeKeySpec);
+             cipher = Cipher.getInstance(algorithm, providerName);
+         } catch (final Exception e) {
+             logger.error("failed to initialize Encryption/Decryption 
algorithm due to {}", new Object[]{e});
+             session.transfer(flowFile, REL_FAILURE);
+             return;
+         }
+ 
+         final int algorithmBlockSize = cipher.getBlockSize();
+         final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : 
DEFAULT_SALT_SIZE;
+ 
+         final StopWatch stopWatch = new StopWatch(true);
+         if 
(context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE)) {
+             final byte[] salt = new byte[saltSize];
+             secureRandom.nextBytes(salt);
+ 
+             final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 
1000);
+             try {
+                 cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec);
+             } catch (final InvalidKeyException | 
InvalidAlgorithmParameterException e) {
+                 logger.error("unable to encrypt {} due to {}", new 
Object[]{flowFile, e});
+                 session.transfer(flowFile, REL_FAILURE);
+                 return;
+             }
+ 
+             flowFile = session.write(flowFile, new EncryptCallback(cipher, 
salt));
+             logger.info("Successfully encrypted {}", new Object[]{flowFile});
+         } else {
+             if (flowFile.getSize() <= saltSize) {
+                 logger.error("Cannot decrypt {} because its file size is not 
greater than the salt size", new Object[]{flowFile});
+                 session.transfer(flowFile, REL_FAILURE);
+                 return;
+             }
+ 
+             flowFile = session.write(flowFile, new DecryptCallback(cipher, 
secretKey, saltSize));
+             logger.info("successfully decrypted {}", new Object[]{flowFile});
+         }
+ 
+         session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+         session.transfer(flowFile, REL_SUCCESS);
+     }
+ 
+     private static class DecryptCallback implements StreamCallback {
+ 
+         private final Cipher cipher;
+         private final SecretKey secretKey;
+         private final int saltSize;
+ 
+         public DecryptCallback(final Cipher cipher, final SecretKey 
secretKey, final int saltSize) {
+             this.cipher = cipher;
+             this.secretKey = secretKey;
+             this.saltSize = saltSize;
+         }
+ 
+         @Override
+         public void process(final InputStream in, final OutputStream out) 
throws IOException {
+             final byte[] salt = new byte[saltSize];
+             StreamUtils.fillBuffer(in, salt);
+ 
+             final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 
1000);
+             try {
+                 cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec);
+             } catch (final Exception e) {
+                 throw new ProcessException(e);
+             }
+ 
+             final byte[] buffer = new byte[65536];
+             int len;
+             while ((len = in.read(buffer)) > 0) {
+                 final byte[] decryptedBytes = cipher.update(buffer, 0, len);
+                 if (decryptedBytes != null) {
+                     out.write(decryptedBytes);
+                 }
+             }
+ 
+             try {
+                 out.write(cipher.doFinal());
+             } catch (final Exception e) {
+                 throw new ProcessException(e);
+             }
+         }
+     }
+ 
+     private static class EncryptCallback implements StreamCallback {
+ 
+         private final Cipher cipher;
+         private final byte[] salt;
+ 
+         public EncryptCallback(final Cipher cipher, final byte[] salt) {
+             this.cipher = cipher;
+             this.salt = salt;
+         }
+ 
+         @Override
+         public void process(final InputStream in, final OutputStream out) 
throws IOException {
+             out.write(salt);
+ 
+             final byte[] buffer = new byte[65536];
+             int len;
+             while ((len = in.read(buffer)) > 0) {
+                 final byte[] encryptedBytes = cipher.update(buffer, 0, len);
+                 if (encryptedBytes != null) {
+                     out.write(encryptedBytes);
+                 }
+             }
+ 
+             try {
+                 out.write(cipher.doFinal());
+             } catch (final IllegalBlockSizeException | BadPaddingException e) 
{
+                 throw new ProcessException(e);
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java
index 0000000,7697d06..4140943
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java
@@@ -1,0 -1,294 +1,294 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"evaluate", "Text", "Regular Expression", "regex", "experimental"})
+ @CapabilityDescription(
+         "Evaluates one or more Regular Expressions against the content of a 
FlowFile.  "
+         + "The results of those Regular Expressions are assigned to FlowFile 
Attributes.  "
+         + "Regular Expressions are entered by adding user-defined properties; 
"
+         + "the name of the property maps to the Attribute Name into which the 
result will be placed.  "
+         + "The value of the property must be a valid Regular Expressions with 
exactly one capturing group.  "
+         + "If the Regular Expression matches more than once, only the first 
match will be used.  "
+         + "If any provided Regular Expression matches, the FlowFile(s) will 
be routed to 'matched'. "
+         + "If no provided Regular Expression matches, the FlowFile will be 
routed to 'unmatched' and no attributes will be applied to the FlowFile.")
+ 
+ public class EvaluateRegularExpression extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+             .name("Character Set")
+             .description("The Character Set in which the file is encoded")
+             .required(true)
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .defaultValue("UTF-8")
+             .build();
+ 
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Maximum Buffer Size")
+             .description("Specifies the maximum amount of data to buffer (per 
file) in order to apply the regular expressions.  Files larger than the 
specified maximum will not be fully evaluated.")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .build();
+ 
+     public static final PropertyDescriptor CANON_EQ = new 
PropertyDescriptor.Builder()
+             .name("Enable Canonical Equivalence")
+             .description("Indicates that two characters match only when their 
full canonical decompositions match.")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor CASE_INSENSITIVE = new 
PropertyDescriptor.Builder()
+             .name("Enable Case-insensitive Matching")
+             .description("Indicates that two characters match even if they 
are in a different case.  Can also be specified via the embeded flag (?i).")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor COMMENTS = new 
PropertyDescriptor.Builder()
+             .name("Permit Whitespace and Comments in Pattern")
+             .description("In this mode, whitespace is ignored, and embedded 
comments starting with # are ignored until the end of a line.  Can also be 
specified via the embeded flag (?x).")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor DOTALL = new 
PropertyDescriptor.Builder()
+             .name("Enable DOTALL Mode")
+             .description("Indicates that the expression '.' should match any 
character, including a line terminator.  Can also be specified via the embeded 
flag (?s).")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor LITERAL = new 
PropertyDescriptor.Builder()
+             .name("Enable Literal Parsing of the Pattern")
+             .description("Indicates that Metacharacters and escape characters 
should be given no special meaning.")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor MULTILINE = new 
PropertyDescriptor.Builder()
+             .name("Enable Multiline Mode")
+             .description("Indicates that '^' and '$' should match just after 
and just before a line terminator or end of sequence, instead of only the 
begining or end of the entire input.  Can also be specified via the embeded 
flag (?m).")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor UNICODE_CASE = new 
PropertyDescriptor.Builder()
+             .name("Enable Unicode-aware Case Folding")
+             .description("When used with 'Enable Case-insensitive Matching', 
matches in a manner consistent with the Unicode Standard.  Can also be 
specified via the embeded flag (?u).")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor UNICODE_CHARACTER_CLASS = new 
PropertyDescriptor.Builder()
+             .name("Enable Unicode Predefined Character Classes")
+             .description("Specifies conformance with the Unicode Technical 
Standard #18: Unicode Regular Expression Annex C: Compatibility Properties.  
Can also be specified via the embeded flag (?U).")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor UNIX_LINES = new 
PropertyDescriptor.Builder()
+             .name("Enable Unix Lines Mode")
+             .description("Indicates that only the '\n' line terminator is 
recognized int the behavior of '.', '^', and '$'.  Can also be specified via 
the embeded flag (?d).")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final Relationship REL_MATCH = new Relationship.Builder()
+             .name("matched")
+             .description(
+                     "FlowFiles are routed to this relationship when the 
Regular Expression is successfully evaluated and the FlowFile "
+                     + "is modified as a result")
+             .build();
+ 
+     public static final Relationship REL_NO_MATCH = new Relationship.Builder()
+             .name("unmatched")
+             .description(
+                     "FlowFiles are routed to this relationship when no 
provided Regular Expression matches the content of the FlowFile")
+             .build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_MATCH);
+         relationships.add(REL_NO_MATCH);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(CHARACTER_SET);
+         properties.add(MAX_BUFFER_SIZE);
+         properties.add(CANON_EQ);
+         properties.add(CASE_INSENSITIVE);
+         properties.add(COMMENTS);
+         properties.add(DOTALL);
+         properties.add(LITERAL);
+         properties.add(MULTILINE);
+         properties.add(UNICODE_CASE);
+         properties.add(UNICODE_CHARACTER_CLASS);
+         properties.add(UNIX_LINES);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+         return new PropertyDescriptor.Builder()
+                 .name(propertyDescriptorName)
+                 .expressionLanguageSupported(false)
+                 .addValidator(StandardValidators.createRegexValidator(1, 1, 
true))
+                 .required(false)
+                 .dynamic(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         final List<FlowFile> flowFileBatch = session.get(50);
+         if (flowFileBatch.isEmpty()) {
+             return;
+         }
+         final ProcessorLog logger = getLogger();
+ 
+         // Compile the Regular Expressions
+         Map<String, Matcher> regexMap = new HashMap<>();
+         for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+             if (!entry.getKey().isDynamic()) {
+                 continue;
+             }
+             final int flags = getCompileFlags(context);
+             final Matcher matcher = Pattern.compile(entry.getValue(), 
flags).matcher("");
+             regexMap.put(entry.getKey().getName(), matcher);
+         }
+ 
+         final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+ 
+         final int maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         for (FlowFile flowFile : flowFileBatch) {
+ 
+             final Map<String, String> regexResults = new HashMap<>();
+ 
+             final byte[] buffer = new byte[maxBufferSize];
+ 
+             session.read(flowFile, new InputStreamCallback() {
+                 @Override
+                 public void process(InputStream in) throws IOException {
+                     StreamUtils.fillBuffer(in, buffer, false);
+                 }
+             });
+ 
+             final int flowFileSize = Math.min((int) flowFile.getSize(), 
maxBufferSize);
+ 
+             final String contentString = new String(buffer, 0, flowFileSize, 
charset);
+ 
+             for (final Map.Entry<String, Matcher> entry : 
regexMap.entrySet()) {
+ 
+                 final Matcher matcher = entry.getValue();
+ 
+                 matcher.reset(contentString);
+ 
+                 if (matcher.find()) {
+                     final String group = matcher.group(1);
+                     if (!StringUtils.isBlank(group)) {
+                         regexResults.put(entry.getKey(), group);
+                     }
+                 }
+             }
+ 
+             if (!regexResults.isEmpty()) {
+                 flowFile = session.putAllAttributes(flowFile, regexResults);
+                 session.getProvenanceReporter().modifyAttributes(flowFile);
+                 session.transfer(flowFile, REL_MATCH);
+                 logger.info("Matched {} Regular Expressions and added 
attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile});
+             } else {
+                 session.transfer(flowFile, REL_NO_MATCH);
+                 logger.info("Did not match any Regular Expressions for  
FlowFile {}", new Object[]{flowFile});
+             }
+ 
+         } // end flowFileLoop
+     }
+ 
+     int getCompileFlags(ProcessContext context) {
+         int flags = (context.getProperty(UNIX_LINES).asBoolean() ? 
Pattern.UNIX_LINES : 0)
+                 | (context.getProperty(CASE_INSENSITIVE).asBoolean() ? 
Pattern.CASE_INSENSITIVE : 0)
+                 | (context.getProperty(COMMENTS).asBoolean() ? 
Pattern.COMMENTS : 0)
+                 | (context.getProperty(MULTILINE).asBoolean() ? 
Pattern.MULTILINE : 0)
+                 | (context.getProperty(LITERAL).asBoolean() ? Pattern.LITERAL 
: 0)
+                 | (context.getProperty(DOTALL).asBoolean() ? Pattern.DOTALL : 
0)
+                 | (context.getProperty(UNICODE_CASE).asBoolean() ? 
Pattern.UNICODE_CASE : 0)
+                 | (context.getProperty(CANON_EQ).asBoolean() ? 
Pattern.CANON_EQ : 0)
+                 | (context.getProperty(UNICODE_CHARACTER_CLASS).asBoolean() ? 
Pattern.UNICODE_CHARACTER_CLASS : 0);
+         return flags;
+     }
+ }

Reply via email to