http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java index 0000000,1fea36d..8910fdc mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java @@@ -1,0 -1,249 +1,249 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.io.OutputStream; + import java.nio.charset.StandardCharsets; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.concurrent.TimeUnit; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.distributed.cache.client.Deserializer; + import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; + import org.apache.nifi.distributed.cache.client.Serializer; + import org.apache.nifi.distributed.cache.client.exception.DeserializationException; + import org.apache.nifi.distributed.cache.client.exception.SerializationException; + import org.apache.nifi.expression.AttributeExpression.ResultType; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + + import org.apache.commons.lang3.StringUtils; + + @EventDriven + @SupportsBatching + @Tags({"experimental", "hash", "dupe", "duplicate", "dedupe"}) + @CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. " + + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's" + + "\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor " + + "routes the FlowFile to 'non-duplicate'") + public class DetectDuplicate extends AbstractProcessor { + + public static final String ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME = "original.flowfile.description"; + + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("The Controller Service that is used to cache unique identifiers, used to determine duplicates") + .required(true) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder() + .name("Cache Entry Identifier") + .description( + "A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated " + + "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .defaultValue("${hash.value}") + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor FLOWFILE_DESCRIPTION = new PropertyDescriptor.Builder() + .name("FlowFile Description") + .description( + "When a FlowFile is added to the cache, this value is stored along with it so that if a duplicate is found, this description of the original FlowFile will be added to the duplicate's \"" + + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME + "\" attribute") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .defaultValue("") + .build(); + + public static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder() + .name("Age Off Duration") + .description("Time interval to age off cached FlowFiles") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final Relationship REL_DUPLICATE = new Relationship.Builder().name("duplicate") + .description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship").build(); + public static final Relationship REL_NON_DUPLICATE = new Relationship.Builder().name("non-duplicate") + .description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("If unable to communicate with the cache, the FlowFile will be penalized and routed to this relationship").build(); + private final Set<Relationship> relationships; + + private final Serializer<String> keySerializer = new StringSerializer(); + private final Serializer<CacheValue> valueSerializer = new CacheValueSerializer(); + private final Deserializer<CacheValue> valueDeserializer = new CacheValueDeserializer(); + + public DetectDuplicate() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_DUPLICATE); + rels.add(REL_NON_DUPLICATE); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(CACHE_ENTRY_IDENTIFIER); + descriptors.add(FLOWFILE_DESCRIPTION); + descriptors.add(AGE_OFF_DURATION); + descriptors.add(DISTRIBUTED_CACHE_SERVICE); + return descriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(cacheKey)) { + logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + final Long durationMS = context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS); + final long now = System.currentTimeMillis(); + + try { + final String flowFileDescription = context.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue(); + final CacheValue cacheValue = new CacheValue(flowFileDescription, now); + final CacheValue originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer); + boolean duplicate = originalCacheValue != null; + if (duplicate && durationMS != null && (now >= originalCacheValue.getEntryTimeMS() + durationMS)) { + boolean status = cache.remove(cacheKey, keySerializer); + logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status}); + // this should typically result in duplicate being false...but, better safe than sorry + duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer); + } + if (duplicate) { + session.getProvenanceReporter().route(flowFile, REL_DUPLICATE, "Duplicate of: " + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME); + String originalFlowFileDescription = originalCacheValue.getDescription(); + flowFile = session.putAttribute(flowFile, ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME, originalFlowFileDescription); + session.transfer(flowFile, REL_DUPLICATE); + logger.info("Found {} to be a duplicate of FlowFile with description {}", new Object[]{flowFile, originalFlowFileDescription}); + session.adjustCounter("Duplicates Detected", 1L, false); + } else { + session.getProvenanceReporter().route(flowFile, REL_NON_DUPLICATE); + session.transfer(flowFile, REL_NON_DUPLICATE); + logger.info("Could not find a duplicate entry in cache for {}; routing to non-duplicate", new Object[]{flowFile}); + session.adjustCounter("Non-Duplicate Files Processed", 1L, false); + } + } catch (final IOException e) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e}); + } + } + + private static class CacheValue { + + private final String description; + private final long entryTimeMS; + + public CacheValue(String description, long entryTimeMS) { + this.description = description; + this.entryTimeMS = entryTimeMS; + } + + public String getDescription() { + return description; + } + + public long getEntryTimeMS() { + return entryTimeMS; + } + + } + + private static class CacheValueSerializer implements Serializer<CacheValue> { + + @Override + public void serialize(final CacheValue entry, final OutputStream out) throws SerializationException, IOException { + long time = entry.getEntryTimeMS(); + byte[] writeBuffer = new byte[8]; + writeBuffer[0] = (byte) (time >>> 56); + writeBuffer[1] = (byte) (time >>> 48); + writeBuffer[2] = (byte) (time >>> 40); + writeBuffer[3] = (byte) (time >>> 32); + writeBuffer[4] = (byte) (time >>> 24); + writeBuffer[5] = (byte) (time >>> 16); + writeBuffer[6] = (byte) (time >>> 8); + writeBuffer[7] = (byte) (time); + out.write(writeBuffer, 0, 8); + out.write(entry.getDescription().getBytes(StandardCharsets.UTF_8)); + } + } + + private static class CacheValueDeserializer implements Deserializer<CacheValue> { + + @Override + public CacheValue deserialize(final byte[] input) throws DeserializationException, IOException { + if (input.length == 0) { + return null; + } + long time = ((long) input[0] << 56) + + ((long) (input[1] & 255) << 48) + + ((long) (input[2] & 255) << 40) + + ((long) (input[3] & 255) << 32) + + ((long) (input[4] & 255) << 24) + + ((input[5] & 255) << 16) + + ((input[6] & 255) << 8) + + ((input[7] & 255)); + String description = new String(input, 8, input.length - 8, StandardCharsets.UTF_8); + CacheValue value = new CacheValue(description, time); + return value; + } + } + + private static class StringSerializer implements Serializer<String> { + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + } + + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java index 0000000,3ac55d2..8fb8ad4 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java @@@ -1,0 -1,498 +1,498 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashSet; + import java.util.LinkedHashMap; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.TreeSet; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.loading.LoadDistributionListener; + import org.apache.nifi.loading.LoadDistributionService; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; + import org.apache.nifi.processor.util.StandardValidators; + + import org.apache.commons.lang3.StringUtils; + + @EventDriven + @SideEffectFree + @SupportsBatching + @TriggerWhenAnyDestinationAvailable + @Tags({"distribute", "load balance", "route", "round robin", "weighted"}) + @CapabilityDescription("Distributes FlowFiles to downstream processors based on a Distribution Strategy. If using the Round Robin " + + "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties" + + "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name " + + "'5' will be receive 10 FlowFiles in each iteration instead of 1.") + public class DistributeLoad extends AbstractProcessor { + + public static final String STRATEGY_ROUND_ROBIN = "round robin"; + public static final String STRATEGY_NEXT_AVAILABLE = "next available"; + public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service"; + + public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder() + .name("Number of Relationships") + .description("Determines the number of Relationships to which the load should be distributed") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder() + .name("Distribution Strategy") + .description( + "Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 destination can accept FlowFiles.") + .required(true) + .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE) + .defaultValue(STRATEGY_ROUND_ROBIN) + .build(); + + public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder() + .name("Hostnames") + .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter") + .required(true) + .addValidator(new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = new ValidationResult.Builder() + .subject(subject) + .valid(true) + .input(input) + .explanation("Good FQDNs") + .build(); + if (null == input) { + result = new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Need to specify delimited list of FQDNs") + .build(); + return result; + } + String[] hostNames = input.split("(?:,+|;+|\\s+)"); + for (String hostName : hostNames) { + if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) { + result = new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Need a FQDN rather than a simple host name.") + .build(); + return result; + } + } + return result; + } + }) + .build(); + public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder() + .name("Load Distribution Service ID") + .description("The identifier of the Load Distribution Service") + .required(true) + .identifiesControllerService(LoadDistributionService.class) + .build(); + + private List<PropertyDescriptor> properties; + private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>(); + private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy()); + private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>(); + private final AtomicBoolean doCustomValidate = new AtomicBoolean(false); + private volatile LoadDistributionListener myListener; + private final AtomicBoolean doSetProps = new AtomicBoolean(true); + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(createRelationship(1)); + relationshipsRef.set(Collections.unmodifiableSet(relationships)); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(NUM_RELATIONSHIPS); + properties.add(DISTRIBUTION_STRATEGY); + this.properties = Collections.unmodifiableList(properties); + } + + private static Relationship createRelationship(final int num) { + return new Relationship.Builder().name(String.valueOf(num)).build(); + } + + @Override + public Set<Relationship> getRelationships() { + return relationshipsRef.get(); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.equals(NUM_RELATIONSHIPS)) { + final Set<Relationship> relationships = new HashSet<>(); + for (int i = 1; i <= Integer.parseInt(newValue); i++) { + relationships.add(createRelationship(i)); + } + this.relationshipsRef.set(Collections.unmodifiableSet(relationships)); + } else if (descriptor.equals(DISTRIBUTION_STRATEGY)) { + switch (newValue.toLowerCase()) { + case STRATEGY_ROUND_ROBIN: + strategyRef.set(new RoundRobinStrategy()); + break; + case STRATEGY_NEXT_AVAILABLE: + strategyRef.set(new NextAvailableStrategy()); + break; + case STRATEGY_LOAD_DISTRIBUTION_SERVICE: + strategyRef.set(new LoadDistributionStrategy()); + } + doSetProps.set(true); + doCustomValidate.set(true); + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) { + final List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE); + props.add(HOSTNAMES); + this.properties = Collections.unmodifiableList(props); + } else if (doSetProps.getAndSet(false)) { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(NUM_RELATIONSHIPS); + props.add(DISTRIBUTION_STRATEGY); + this.properties = Collections.unmodifiableList(props); + } + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + // validate that the property name is valid. + final int numRelationships = this.relationshipsRef.get().size(); + try { + final int value = Integer.parseInt(propertyDescriptorName); + if (value <= 0 || value > numRelationships) { + return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)) + .name(propertyDescriptorName).build(); + } + } catch (final NumberFormatException e) { + return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)) + .name(propertyDescriptorName).build(); + } + + // validate that the property value is valid + return new PropertyDescriptor.Builder().addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .name(propertyDescriptorName).dynamic(true).build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Collection<ValidationResult> results = new ArrayList<>(); + if (doCustomValidate.getAndSet(false)) { + String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue(); + if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) { + // make sure Hostnames and Controller service are set + PropertyValue propDesc = validationContext.getProperty(HOSTNAMES); + if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { + results.add(new ValidationResult.Builder() + .subject(HOSTNAMES.getName()) + .explanation("Must specify Hostnames when using 'Load Distribution Strategy'") + .valid(false) + .build()); + } + propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE); + if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { + results.add(new ValidationResult.Builder() + .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName()) + .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'") + .valid(false) + .build()); + } + if (results.isEmpty()) { + int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger(); + String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue(); + String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); + int numHosts = 0; + for (String hostName : hostNames) { + if (StringUtils.isNotBlank(hostName)) { + hostNames[numHosts++] = hostName; + } + } + if (numHosts > numRels) { + results.add(new ValidationResult.Builder() + .subject("Number of Relationships and Hostnames") + .explanation("Number of Relationships must be equal to, or greater than, the number of host names") + .valid(false) + .build()); + } else { + // create new relationships with descriptions of hostname + Set<Relationship> relsWithDesc = new TreeSet<>(); + for (int i = 0; i < numHosts; i++) { + relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)).description(hostNames[i]).build()); + } + // add add'l rels if configuration requires it...it probably shouldn't + for (int i = numHosts + 1; i <= numRels; i++) { + relsWithDesc.add(createRelationship(i)); + } + relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc)); + } + } + } + } + return results; + } + + @OnScheduled + public void createWeightedList(final ProcessContext context) { + final Map<Integer, Integer> weightings = new LinkedHashMap<>(); + + String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue(); + if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) { + String hostNamesValue = context.getProperty(HOSTNAMES).getValue(); + String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); + Set<String> hostNameSet = new HashSet<>(); + for (String hostName : hostNames) { + if (StringUtils.isNotBlank(hostName)) { + hostNameSet.add(hostName); + } + } + LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class); + myListener = new LoadDistributionListener() { + + @Override + public void update(Map<String, Integer> loadInfo) { + for (Relationship rel : relationshipsRef.get()) { + String hostname = rel.getDescription(); + Integer weight = 1; + if (loadInfo.containsKey(hostname)) { + weight = loadInfo.get(hostname); + } + weightings.put(Integer.decode(rel.getName()), weight); + } + updateWeightedRelationships(weightings); + } + }; + + Map<String, Integer> loadInfo = svc.getLoadDistribution(hostNameSet, myListener); + for (Relationship rel : relationshipsRef.get()) { + String hostname = rel.getDescription(); + Integer weight = 1; + if (loadInfo.containsKey(hostname)) { + weight = loadInfo.get(hostname); + } + weightings.put(Integer.decode(rel.getName()), weight); + } + + } else { + final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); + for (int i = 1; i <= numRelationships; i++) { + weightings.put(i, 1); + } + for (final PropertyDescriptor propDesc : context.getProperties().keySet()) { + if (!this.properties.contains(propDesc)) { + final int relationship = Integer.parseInt(propDesc.getName()); + final int weighting = context.getProperty(propDesc).asInteger(); + weightings.put(relationship, weighting); + } + } + } + updateWeightedRelationships(weightings); + } + + private void updateWeightedRelationships(final Map<Integer, Integer> weightings) { + final List<Relationship> relationshipList = new ArrayList<>(); + for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) { + final String relationshipName = String.valueOf(entry.getKey()); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + for (int i = 0; i < entry.getValue(); i++) { + relationshipList.add(relationship); + } + } + + this.weightedRelationshipListRef.set(Collections.unmodifiableList(relationshipList)); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final DistributionStrategy strategy = strategyRef.get(); + final Set<Relationship> available = context.getAvailableRelationships(); + final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); + final boolean allDestinationsAvailable = (available.size() == numRelationships); + if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) { + return; + } + + final Relationship relationship = strategy.mapToRelationship(context, flowFile); + if (relationship == null) { + // can't transfer the FlowFiles. Roll back and yield + session.rollback(); + context.yield(); + return; + } + + session.transfer(flowFile, relationship); + session.getProvenanceReporter().route(flowFile, relationship); + } + + private static class InvalidPropertyNameValidator implements Validator { + + private final String propertyName; + + public InvalidPropertyNameValidator(final String propertyName) { + this.propertyName = propertyName; + } + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { + return new ValidationResult.Builder().subject("Property Name").input(propertyName) + .explanation("Property Name must be a positive integer between 1 and the number of relationships (inclusive)") + .valid(false) + .build(); + } + } + + /** + * Implementations must be thread-safe. + */ + private static interface DistributionStrategy { + + /** + * Returns a mapping of FlowFile to Relationship or <code>null</code> if + * the needed relationships are not available to accept files. + * + * @param session + * @param flowFiles + * @return + */ + Relationship mapToRelationship(ProcessContext context, FlowFile flowFile); + + boolean requiresAllDestinationsAvailable(); + } + + private class LoadDistributionStrategy implements DistributionStrategy { + + private final AtomicLong counter = new AtomicLong(0L); + + @Override + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { + final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); + final int numRelationships = relationshipList.size(); + + // create a HashSet that contains all of the available relationships, as calling #contains on HashSet + // is much faster than calling it on a List + boolean foundFreeRelationship = false; + Relationship relationship = null; + + int attempts = 0; + while (!foundFreeRelationship) { + final long counterValue = counter.getAndIncrement(); + final int idx = (int) (counterValue % numRelationships); + relationship = relationshipList.get(idx); + foundFreeRelationship = context.getAvailableRelationships().contains(relationship); + if (++attempts % numRelationships == 0 && !foundFreeRelationship) { + return null; + } + } + + return relationship; + } + + @Override + public boolean requiresAllDestinationsAvailable() { + return false; + } + + } + + private class RoundRobinStrategy implements DistributionStrategy { + + private final AtomicLong counter = new AtomicLong(0L); + + @Override + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { + final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); + final long counterValue = counter.getAndIncrement(); + final int idx = (int) (counterValue % relationshipList.size()); + final Relationship relationship = relationshipList.get(idx); + return relationship; + } + + @Override + public boolean requiresAllDestinationsAvailable() { + return true; + } + } + + private class NextAvailableStrategy implements DistributionStrategy { + + private final AtomicLong counter = new AtomicLong(0L); + + @Override + public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { + final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); + final int numRelationships = relationshipList.size(); + + // create a HashSet that contains all of the available relationships, as calling #contains on HashSet + // is much faster than calling it on a List + boolean foundFreeRelationship = false; + Relationship relationship = null; + + int attempts = 0; + while (!foundFreeRelationship) { + final long counterValue = counter.getAndIncrement(); + final int idx = (int) (counterValue % numRelationships); + relationship = relationshipList.get(idx); + foundFreeRelationship = context.getAvailableRelationships().contains(relationship); + if (++attempts % numRelationships == 0 && !foundFreeRelationship) { + return null; + } + } + + return relationship; + } + + @Override + public boolean requiresAllDestinationsAvailable() { + return false; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java index 0000000,eb079bb..69cb18e mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java @@@ -1,0 -1,263 +1,263 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.security.util.EncryptionMethod; + import org.apache.nifi.util.StopWatch; + + import org.bouncycastle.jce.provider.BouncyCastleProvider; + + import javax.crypto.*; + import javax.crypto.spec.PBEKeySpec; + import javax.crypto.spec.PBEParameterSpec; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.security.InvalidAlgorithmParameterException; + import java.security.InvalidKeyException; + import java.security.SecureRandom; + import java.security.Security; + import java.text.Normalizer; + import java.util.*; + import java.util.concurrent.TimeUnit; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"encryption", "decryption", "password", "JCE"}) + @CapabilityDescription("Encrypts or Decrypts a FlowFile using a randomly generated salt") + public class EncryptContent extends AbstractProcessor { + + public static final String ENCRYPT_MODE = "Encrypt"; + public static final String DECRYPT_MODE = "Decrypt"; + public static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG"; + public static final int DEFAULT_SALT_SIZE = 8; + + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Specifies whether the content should be encrypted or decrypted") + .required(true) + .allowableValues(ENCRYPT_MODE, DECRYPT_MODE) + .defaultValue(ENCRYPT_MODE) + .build(); + public static final PropertyDescriptor ENCRYPTION_ALGORITHM = new PropertyDescriptor.Builder() + .name("Encryption Algorithm") + .description("The Encryption Algorithm to use") + .required(true) + .allowableValues(EncryptionMethod.values()) + .defaultValue(EncryptionMethod.MD5_256AES.name()) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The Password to use for encrypting or decrypting the data") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully encrypted or decrypted will be routed to success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be encrypted or decrypted will be routed to failure").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + static { + // add BouncyCastle encryption providers + Security.addProvider(new BouncyCastleProvider()); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(MODE); + properties.add(ENCRYPTION_ALGORITHM); + properties.add(PASSWORD); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final String method = context.getProperty(ENCRYPTION_ALGORITHM).getValue(); + final EncryptionMethod encryptionMethod = EncryptionMethod.valueOf(method); + final String providerName = encryptionMethod.getProvider(); + final String algorithm = encryptionMethod.getAlgorithm(); + + final String password = context.getProperty(PASSWORD).getValue(); + final char[] normalizedPassword = Normalizer.normalize(password, Normalizer.Form.NFC).toCharArray(); + final PBEKeySpec pbeKeySpec = new PBEKeySpec(normalizedPassword); + + final SecureRandom secureRandom; + final SecretKeyFactory factory; + final SecretKey secretKey; + final Cipher cipher; + try { + secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM); + secureRandom.setSeed(System.currentTimeMillis()); + factory = SecretKeyFactory.getInstance(algorithm, providerName); + secretKey = factory.generateSecret(pbeKeySpec); + cipher = Cipher.getInstance(algorithm, providerName); + } catch (final Exception e) { + logger.error("failed to initialize Encryption/Decryption algorithm due to {}", new Object[]{e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final int algorithmBlockSize = cipher.getBlockSize(); + final int saltSize = (algorithmBlockSize > 0) ? algorithmBlockSize : DEFAULT_SALT_SIZE; + + final StopWatch stopWatch = new StopWatch(true); + if (context.getProperty(MODE).getValue().equalsIgnoreCase(ENCRYPT_MODE)) { + final byte[] salt = new byte[saltSize]; + secureRandom.nextBytes(salt); + + final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000); + try { + cipher.init(Cipher.ENCRYPT_MODE, secretKey, parameterSpec); + } catch (final InvalidKeyException | InvalidAlgorithmParameterException e) { + logger.error("unable to encrypt {} due to {}", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + flowFile = session.write(flowFile, new EncryptCallback(cipher, salt)); + logger.info("Successfully encrypted {}", new Object[]{flowFile}); + } else { + if (flowFile.getSize() <= saltSize) { + logger.error("Cannot decrypt {} because its file size is not greater than the salt size", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + flowFile = session.write(flowFile, new DecryptCallback(cipher, secretKey, saltSize)); + logger.info("successfully decrypted {}", new Object[]{flowFile}); + } + + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } + + private static class DecryptCallback implements StreamCallback { + + private final Cipher cipher; + private final SecretKey secretKey; + private final int saltSize; + + public DecryptCallback(final Cipher cipher, final SecretKey secretKey, final int saltSize) { + this.cipher = cipher; + this.secretKey = secretKey; + this.saltSize = saltSize; + } + + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + final byte[] salt = new byte[saltSize]; + StreamUtils.fillBuffer(in, salt); + + final PBEParameterSpec parameterSpec = new PBEParameterSpec(salt, 1000); + try { + cipher.init(Cipher.DECRYPT_MODE, secretKey, parameterSpec); + } catch (final Exception e) { + throw new ProcessException(e); + } + + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) > 0) { + final byte[] decryptedBytes = cipher.update(buffer, 0, len); + if (decryptedBytes != null) { + out.write(decryptedBytes); + } + } + + try { + out.write(cipher.doFinal()); + } catch (final Exception e) { + throw new ProcessException(e); + } + } + } + + private static class EncryptCallback implements StreamCallback { + + private final Cipher cipher; + private final byte[] salt; + + public EncryptCallback(final Cipher cipher, final byte[] salt) { + this.cipher = cipher; + this.salt = salt; + } + + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + out.write(salt); + + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) > 0) { + final byte[] encryptedBytes = cipher.update(buffer, 0, len); + if (encryptedBytes != null) { + out.write(encryptedBytes); + } + } + + try { + out.write(cipher.doFinal()); + } catch (final IllegalBlockSizeException | BadPaddingException e) { + throw new ProcessException(e); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java index 0000000,7697d06..4140943 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateRegularExpression.java @@@ -1,0 -1,294 +1,294 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.io.InputStream; + import java.nio.charset.Charset; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.regex.Matcher; + import java.util.regex.Pattern; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + + import org.apache.commons.lang3.StringUtils; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"evaluate", "Text", "Regular Expression", "regex", "experimental"}) + @CapabilityDescription( + "Evaluates one or more Regular Expressions against the content of a FlowFile. " + + "The results of those Regular Expressions are assigned to FlowFile Attributes. " + + "Regular Expressions are entered by adding user-defined properties; " + + "the name of the property maps to the Attribute Name into which the result will be placed. " + + "The value of the property must be a valid Regular Expressions with exactly one capturing group. " + + "If the Regular Expression matches more than once, only the first match will be used. " + + "If any provided Regular Expression matches, the FlowFile(s) will be routed to 'matched'. " + + "If no provided Regular Expression matches, the FlowFile will be routed to 'unmatched' and no attributes will be applied to the FlowFile.") + + public class EvaluateRegularExpression extends AbstractProcessor { + + public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character Set in which the file is encoded") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Maximum Buffer Size") + .description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated.") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + public static final PropertyDescriptor CANON_EQ = new PropertyDescriptor.Builder() + .name("Enable Canonical Equivalence") + .description("Indicates that two characters match only when their full canonical decompositions match.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor CASE_INSENSITIVE = new PropertyDescriptor.Builder() + .name("Enable Case-insensitive Matching") + .description("Indicates that two characters match even if they are in a different case. Can also be specified via the embeded flag (?i).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor COMMENTS = new PropertyDescriptor.Builder() + .name("Permit Whitespace and Comments in Pattern") + .description("In this mode, whitespace is ignored, and embedded comments starting with # are ignored until the end of a line. Can also be specified via the embeded flag (?x).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor DOTALL = new PropertyDescriptor.Builder() + .name("Enable DOTALL Mode") + .description("Indicates that the expression '.' should match any character, including a line terminator. Can also be specified via the embeded flag (?s).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor LITERAL = new PropertyDescriptor.Builder() + .name("Enable Literal Parsing of the Pattern") + .description("Indicates that Metacharacters and escape characters should be given no special meaning.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor MULTILINE = new PropertyDescriptor.Builder() + .name("Enable Multiline Mode") + .description("Indicates that '^' and '$' should match just after and just before a line terminator or end of sequence, instead of only the begining or end of the entire input. Can also be specified via the embeded flag (?m).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor UNICODE_CASE = new PropertyDescriptor.Builder() + .name("Enable Unicode-aware Case Folding") + .description("When used with 'Enable Case-insensitive Matching', matches in a manner consistent with the Unicode Standard. Can also be specified via the embeded flag (?u).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor UNICODE_CHARACTER_CLASS = new PropertyDescriptor.Builder() + .name("Enable Unicode Predefined Character Classes") + .description("Specifies conformance with the Unicode Technical Standard #18: Unicode Regular Expression Annex C: Compatibility Properties. Can also be specified via the embeded flag (?U).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor UNIX_LINES = new PropertyDescriptor.Builder() + .name("Enable Unix Lines Mode") + .description("Indicates that only the '\n' line terminator is recognized int the behavior of '.', '^', and '$'. Can also be specified via the embeded flag (?d).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final Relationship REL_MATCH = new Relationship.Builder() + .name("matched") + .description( + "FlowFiles are routed to this relationship when the Regular Expression is successfully evaluated and the FlowFile " + + "is modified as a result") + .build(); + + public static final Relationship REL_NO_MATCH = new Relationship.Builder() + .name("unmatched") + .description( + "FlowFiles are routed to this relationship when no provided Regular Expression matches the content of the FlowFile") + .build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_MATCH); + relationships.add(REL_NO_MATCH); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(CHARACTER_SET); + properties.add(MAX_BUFFER_SIZE); + properties.add(CANON_EQ); + properties.add(CASE_INSENSITIVE); + properties.add(COMMENTS); + properties.add(DOTALL); + properties.add(LITERAL); + properties.add(MULTILINE); + properties.add(UNICODE_CASE); + properties.add(UNICODE_CHARACTER_CLASS); + properties.add(UNIX_LINES); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.createRegexValidator(1, 1, true)) + .required(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFileBatch = session.get(50); + if (flowFileBatch.isEmpty()) { + return; + } + final ProcessorLog logger = getLogger(); + + // Compile the Regular Expressions + Map<String, Matcher> regexMap = new HashMap<>(); + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (!entry.getKey().isDynamic()) { + continue; + } + final int flags = getCompileFlags(context); + final Matcher matcher = Pattern.compile(entry.getValue(), flags).matcher(""); + regexMap.put(entry.getKey().getName(), matcher); + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); + + final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + + for (FlowFile flowFile : flowFileBatch) { + + final Map<String, String> regexResults = new HashMap<>(); + + final byte[] buffer = new byte[maxBufferSize]; + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer, false); + } + }); + + final int flowFileSize = Math.min((int) flowFile.getSize(), maxBufferSize); + + final String contentString = new String(buffer, 0, flowFileSize, charset); + + for (final Map.Entry<String, Matcher> entry : regexMap.entrySet()) { + + final Matcher matcher = entry.getValue(); + + matcher.reset(contentString); + + if (matcher.find()) { + final String group = matcher.group(1); + if (!StringUtils.isBlank(group)) { + regexResults.put(entry.getKey(), group); + } + } + } + + if (!regexResults.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, regexResults); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_MATCH); + logger.info("Matched {} Regular Expressions and added attributes to FlowFile {}", new Object[]{regexResults.size(), flowFile}); + } else { + session.transfer(flowFile, REL_NO_MATCH); + logger.info("Did not match any Regular Expressions for FlowFile {}", new Object[]{flowFile}); + } + + } // end flowFileLoop + } + + int getCompileFlags(ProcessContext context) { + int flags = (context.getProperty(UNIX_LINES).asBoolean() ? Pattern.UNIX_LINES : 0) + | (context.getProperty(CASE_INSENSITIVE).asBoolean() ? Pattern.CASE_INSENSITIVE : 0) + | (context.getProperty(COMMENTS).asBoolean() ? Pattern.COMMENTS : 0) + | (context.getProperty(MULTILINE).asBoolean() ? Pattern.MULTILINE : 0) + | (context.getProperty(LITERAL).asBoolean() ? Pattern.LITERAL : 0) + | (context.getProperty(DOTALL).asBoolean() ? Pattern.DOTALL : 0) + | (context.getProperty(UNICODE_CASE).asBoolean() ? Pattern.UNICODE_CASE : 0) + | (context.getProperty(CANON_EQ).asBoolean() ? Pattern.CANON_EQ : 0) + | (context.getProperty(UNICODE_CHARACTER_CLASS).asBoolean() ? Pattern.UNICODE_CHARACTER_CLASS : 0); + return flags; + } + }
