http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java index 0000000,5d583ca..7d191fb mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java @@@ -1,0 -1,197 +1,163 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.ssl; + -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - + import java.util.HashMap; ++import java.util.Map; + -import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.reporting.InitializationException; + import org.apache.nifi.ssl.SSLContextService.ClientAuth; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; - + import org.junit.Assert; + import org.junit.Test; + + public class SSLContextServiceTest { + + @Test - public void testBad1() { - try { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - SSLContextService service = new StandardSSLContextService(); - HashMap<String, String> properties = new HashMap<String, String>(); - runner.addControllerService("test-bad1", service, properties); - Assert.fail("Should have thrown an Exception"); - } catch (InitializationException e) { - assertEquals( - "org.apache.nifi.reporting.InitializationException: SSLContextService[id=test-bad1] does not have the KeyStore or the TrustStore populated", - e.getCause().getCause().toString()); - } ++ public void testBad1() throws InitializationException { ++ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); ++ final SSLContextService service = new StandardSSLContextService(); ++ final Map<String, String> properties = new HashMap<String, String>(); ++ runner.addControllerService("test-bad1", service, properties); ++ runner.assertNotValid(service); + } + + @Test - public void testBad2() { - try { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - SSLContextService service = new StandardSSLContextService(); - HashMap<String, String> properties = new HashMap<String, String>(); - properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - runner.addControllerService("test-bad2", service, properties); - Assert.fail("Should have thrown an Exception"); - } catch (InitializationException e) { - assertEquals( - "org.apache.nifi.reporting.InitializationException: SSLContextService[id=test-bad2] is not valid due to:\n'Keystore Properties' is invalid because Must set either 0 or 3 properties for Keystore", - e.getCause().getCause().toString()); - } ++ public void testBad2() throws InitializationException { ++ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); ++ final SSLContextService service = new StandardSSLContextService(); ++ final Map<String, String> properties = new HashMap<String, String>(); ++ properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); ++ properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); ++ runner.addControllerService("test-bad2", service, properties); ++ runner.assertNotValid(service); + } + + @Test - public void testBad3() { - try { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - SSLContextService service = new StandardSSLContextService(); - HashMap<String, String> properties = new HashMap<String, String>(); - properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); - properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - runner.addControllerService("test-bad3", service, properties); - Assert.fail("Should have thrown an Exception"); - } catch (InitializationException e) { - assertEquals( - "org.apache.nifi.reporting.InitializationException: SSLContextService[id=test-bad3] is not valid due to:\n'Truststore Properties' is invalid because Must set either 0 or 3 properties for Truststore", - e.getCause().getCause().toString()); - } ++ public void testBad3() throws InitializationException { ++ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); ++ final SSLContextService service = new StandardSSLContextService(); ++ final Map<String, String> properties = new HashMap<String, String>(); ++ properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); ++ properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); ++ properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); ++ properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); ++ runner.addControllerService("test-bad3", service, properties); ++ runner.assertNotValid(service); + } + + @Test - public void testBad4() { - try { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - SSLContextService service = new StandardSSLContextService(); - HashMap<String, String> properties = new HashMap<String, String>(); - properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "wrongpassword"); - properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "PKCS12"); - properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "wrongpassword"); - properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); - runner.addControllerService("test-bad4", service, properties); - Assert.fail("Should have thrown an Exception"); - } catch (InitializationException e) { - assertEquals( - "org.apache.nifi.reporting.InitializationException: SSLContextService[id=test-bad4] is not valid due to:\n" - + "'Keystore Properties' is invalid because Invalid KeyStore Password or Type specified for file src/test/resources/localhost-ks.jks\n" - + "'Truststore Properties' is invalid because Invalid KeyStore Password or Type specified for file src/test/resources/localhost-ts.jks", - e.getCause().getCause().toString()); - } ++ public void testBad4() throws InitializationException { ++ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); ++ final SSLContextService service = new StandardSSLContextService(); ++ final Map<String, String> properties = new HashMap<String, String>(); ++ properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); ++ properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "wrongpassword"); ++ properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "PKCS12"); ++ properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); ++ properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "wrongpassword"); ++ properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); ++ runner.addControllerService("test-bad4", service, properties); ++ ++ runner.assertNotValid(service); + } + + @Test - public void testBad5() { - try { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - SSLContextService service = new StandardSSLContextService(); - HashMap<String, String> properties = new HashMap<String, String>(); - properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/DOES-NOT-EXIST.jks"); - properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "PKCS12"); - properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); - properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); - runner.addControllerService("test-bad5", service, properties); - Assert.fail("Should have thrown an Exception"); - } catch (InitializationException e) { - assertTrue(e.getCause().getCause().toString().startsWith("org.apache.nifi.reporting.InitializationException: " - + "SSLContextService[id=test-bad5] is not valid due to:\n'Keystore Properties' is invalid " - + "because Cannot access file")); - } ++ public void testBad5() throws InitializationException { ++ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); ++ final SSLContextService service = new StandardSSLContextService(); ++ final Map<String, String> properties = new HashMap<String, String>(); ++ properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/DOES-NOT-EXIST.jks"); ++ properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); ++ properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "PKCS12"); ++ properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); ++ properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); ++ properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); ++ runner.addControllerService("test-bad5", service, properties); ++ runner.assertNotValid(service); + } + + @Test - public void testGood() { - try { - TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - ControllerService service = new StandardSSLContextService(); - HashMap<String, String> properties = new HashMap<String, String>(); - properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); - properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); - properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "PKCS12"); - properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); - properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); - properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); - runner.addControllerService("test-good1", service, properties); - runner.setProperty("SSL Context Svc ID", "test-good1"); - runner.assertValid(); - service = runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1"); - Assert.assertNotNull(service); - Assert.assertTrue(service instanceof StandardSSLContextService); - SSLContextService sslService = (SSLContextService) service; - sslService.createSSLContext(ClientAuth.REQUIRED); - sslService.createSSLContext(ClientAuth.WANT); - sslService.createSSLContext(ClientAuth.NONE); - } catch (InitializationException e) { - } ++ public void testGood() throws InitializationException { ++ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); ++ SSLContextService service = new StandardSSLContextService(); ++ runner.addControllerService("test-good1", service); ++ runner.setProperty(service, StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); ++ runner.setProperty(service, StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); ++ runner.setProperty(service, StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); ++ runner.setProperty(service, StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); ++ runner.setProperty(service, StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); ++ runner.setProperty(service, StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); ++ runner.enableControllerService(service); ++ ++ runner.setProperty("SSL Context Svc ID", "test-good1"); ++ runner.assertValid(service); ++ service = (SSLContextService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1"); ++ Assert.assertNotNull(service); ++ SSLContextService sslService = (SSLContextService) service; ++ sslService.createSSLContext(ClientAuth.REQUIRED); ++ sslService.createSSLContext(ClientAuth.WANT); ++ sslService.createSSLContext(ClientAuth.NONE); + } + + @Test + public void testGoodTrustOnly() { + try { + TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + SSLContextService service = new StandardSSLContextService(); + HashMap<String, String> properties = new HashMap<String, String>(); + properties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + runner.addControllerService("test-good2", service, properties); ++ runner.enableControllerService(service); ++ + runner.setProperty("SSL Context Svc ID", "test-good2"); + runner.assertValid(); + Assert.assertNotNull(service); + Assert.assertTrue(service instanceof StandardSSLContextService); + service.createSSLContext(ClientAuth.NONE); + } catch (InitializationException e) { + } + } + + @Test + public void testGoodKeyOnly() { + try { + TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + SSLContextService service = new StandardSSLContextService(); + HashMap<String, String> properties = new HashMap<String, String>(); + properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + runner.addControllerService("test-good3", service, properties); ++ runner.enableControllerService(service); ++ + runner.setProperty("SSL Context Svc ID", "test-good3"); + runner.assertValid(); + Assert.assertNotNull(service); + Assert.assertTrue(service instanceof StandardSSLContextService); + SSLContextService sslService = service; + sslService.createSSLContext(ClientAuth.NONE); + } catch (Exception e) { + System.out.println(e); + Assert.fail("Should not have thrown a exception " + e.getMessage()); + } + } + + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 0000000,3721df9..5e1e026 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@@ -1,0 -1,508 +1,507 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.attributes; + + import java.net.URI; + import java.net.URISyntaxException; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.atomic.AtomicReference; + ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.expression.AttributeExpression; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.search.SearchContext; + import org.apache.nifi.search.SearchResult; + import org.apache.nifi.search.Searchable; + import org.apache.nifi.update.attributes.Action; + import org.apache.nifi.update.attributes.Condition; + import org.apache.nifi.update.attributes.Criteria; + import org.apache.nifi.update.attributes.Rule; + import org.apache.nifi.update.attributes.FlowFilePolicy; + import org.apache.nifi.update.attributes.serde.CriteriaSerDe; - + import org.apache.commons.lang3.StringUtils; + + /** + * This processor supports updating flowfile attributes and can do so + * conditionally or unconditionally. Like the FlowFileMetadataEnhancer, it can + * be configured with an arbitrary number of optional properties to define how + * attributes should be updated. Each optional property represents an action + * that is applied to all incoming flow files. An action is comprised of an + * attribute key and a format string. The format string supports the following + * parameters. + * <ul> + * <li>%1 - is the random generated UUID. </li> + * <li>%2 - is the current calendar time. </li> + * <li>${"attribute.key") - is the flow file attribute value of the key + * contained within the brackets.</li> + * </ul> + * + * When creating the optional properties, enter the attribute key as the + * property name and the desired format string as the value. The optional + * properties are considered default actions and are applied unconditionally. + * + * In addition to the default actions, this processor has a user interface (UI) + * where conditional actions can be specified. In the UI, rules can be created. + * Rules are comprised of an arbitrary number of conditions and actions. In + * order for a rule to be activated, all conditions must evaluate to true. + * + * A rule condition is comprised of an attribute key and a regular expression. A + * condition evaluates to true when the flowfile contains the attribute + * specified and it's value matches the specified regular expression. + * + * A rule action follows the same definition as a rule above. It includes an + * attribute key and a format string. The format string supports the same + * parameters defined above. + * + * When a rule is activated (because conditions evaluate to true), all actions + * in that rule are executed. Once each action has been applied, any remaining + * default actions will be applied. This means that if rule action and a default + * action modify the same attribute, only the rule action will execute. Default + * actions will only execute when the attribute in question is not modified as + * part of an activated rule. + * + * The incoming flow file is cloned for each rule that is activated. If no rule + * is activated, any default actions are applied to the original flowfile and it + * is transferred. + * + * This processor only supports a SUCCESS relationship. + * + * Note: In order for configuration changes made in the custom UI to take + * effect, the processor must be stopped and started. + */ + @EventDriven + @SideEffectFree + @Tags({"attributes", "modification", "update", "Attribute Expression Language"}) + @CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language") + public class UpdateAttribute extends AbstractProcessor implements Searchable { + + private final AtomicReference<Criteria> criteriaCache = new AtomicReference<>(null); + private final ConcurrentMap<String, PropertyValue> propertyValues = new ConcurrentHashMap<>(); + + private final Set<Relationship> relationships; + + // relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .description("All FlowFiles are routed to this relationship").name("success").build(); + + public UpdateAttribute() { + final Set<Relationship> relationshipSet = new HashSet<>(); + relationshipSet.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(relationshipSet); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @OnScheduled + public void clearPropertyValueMap() { + propertyValues.clear(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> reasons = new ArrayList<>(super.customValidate(context)); + + Criteria criteria = null; + try { + criteria = CriteriaSerDe.deserialize(context.getAnnotationData()); + } catch (IllegalArgumentException iae) { + reasons.add(new ValidationResult.Builder().valid(false).explanation("Unable to deserialize the update criteria." + iae.getMessage()).build()); + } + + // if there is criteria, validate it + if (criteria != null) { + final List<Rule> rules = criteria.getRules(); + + if (rules == null) { + reasons.add(new ValidationResult.Builder().valid(false).explanation("Update criteria has been specified by no rules were found.").build()); + } else { + // validate the each rule + for (final Rule rule : rules) { + if (rule.getName() == null || rule.getName().trim().isEmpty()) { + reasons.add(new ValidationResult.Builder().valid(false).explanation("A rule name was not specified.").build()); + } + + // validate each condition + final Set<Condition> conditions = rule.getConditions(); + if (conditions == null) { + reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("No conditions for rule '%s' found.", rule.getName())).build()); + } else { + for (final Condition condition : conditions) { + if (condition.getExpression() == null) { + reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("No expression for a condition in rule '%s' was found.", rule.getName())).build()); + } else { + final String expression = condition.getExpression().trim(); + reasons.add(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false).validate(String.format("Condition for rule '%s'.", rule.getName()), expression, context)); + } + } + } + + // validate each action + final Set<Action> actions = rule.getActions(); + if (actions == null) { + reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("No actions for rule '%s' found.", rule.getName())).build()); + } else { + for (final Action action : actions) { + if (action.getAttribute() == null) { + reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("An action in rule '%s' is missing the attribute name.", rule.getName())).build()); + } else if (action.getValue() == null) { + reasons.add(new ValidationResult.Builder().valid(false).explanation(String.format("No value for attribute '%s' in rule '%s' was found.", action.getAttribute(), rule.getName())).build()); + } else { + reasons.add(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true).validate(String.format("Action for rule '%s'.", rule.getName()), action.getValue(), context)); + } + } + } + } + } + } + + return reasons; + } + + @Override + public Collection<SearchResult> search(final SearchContext context) { + final String term = context.getSearchTerm(); + + final Collection<SearchResult> results = new ArrayList<>(); + if (StringUtils.isBlank(context.getAnnotationData())) { + return results; + } + + try { + // parse the annotation data + final Criteria criteria = CriteriaSerDe.deserialize(context.getAnnotationData()); + + // ensure there are some rules + if (criteria.getRules() != null) { + final FlowFilePolicy flowFilePolicy = criteria.getFlowFilePolicy(); + if (flowFilePolicy != null && StringUtils.containsIgnoreCase(flowFilePolicy.name(), term)) { + results.add(new SearchResult.Builder().label("FlowFile policy").match(flowFilePolicy.name()).build()); + } + + for (final Rule rule : criteria.getRules()) { + if (StringUtils.containsIgnoreCase(rule.getName(), term)) { + results.add(new SearchResult.Builder().label("Rule name").match(rule.getName()).build()); + } + + // ensure there are some conditions + if (rule.getConditions() != null) { + for (final Condition condition : rule.getConditions()) { + if (StringUtils.containsIgnoreCase(condition.getExpression(), term)) { + results.add(new SearchResult.Builder().label(String.format("Condition in rule '%s'", rule.getName())).match(condition.getExpression()).build()); + } + } + } + + // ensure there are some actions + if (rule.getActions() != null) { + for (final Action action : rule.getActions()) { + if (StringUtils.containsIgnoreCase(action.getAttribute(), term)) { + results.add(new SearchResult.Builder().label(String.format("Action in rule '%s'", rule.getName())).match(action.getAttribute()).build()); + } + if (StringUtils.containsIgnoreCase(action.getValue(), term)) { + results.add(new SearchResult.Builder().label(String.format("Action in rule '%s'", rule.getName())).match(action.getValue()).build()); + } + } + } + } + } + + return results; + } catch (Exception e) { + return results; + } + } + + @OnScheduled + public void parseAnnotationData(final ProcessContext context) { + criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData())); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final ProcessorLog logger = getLogger(); + final Criteria criteria = criteriaCache.get(); + + List<FlowFile> flowFiles = session.get(100); + if (flowFiles.isEmpty()) { + return; + } + + final Map<PropertyDescriptor, String> properties = context.getProperties(); + + // get the default actions + final Map<String, Action> defaultActions = getDefaultActions(properties); + + // record which rule should be applied to which flow file - when operating + // in 'use clone' mode, this collection will contain a number of entries + // that map to single element lists. this is because the original flowfile + // is cloned for each matching rule. in 'use original' mode, this collection + // will contain a single entry that maps a list of multiple rules. this is + // because is the original flowfile is used for all matching rules. in this + // case the order of the matching rules is perserved in the list + final Map<FlowFile, List<Rule>> matchedRules = new HashMap<>(); + + for (FlowFile flowFile : flowFiles) { + matchedRules.clear(); + + // if there is update criteria specified, evaluate it + if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules)) { + // apply the actions for each rule and transfer the flowfile + for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) { + FlowFile match = entry.getKey(); + final List<Rule> rules = entry.getValue(); + + // execute each matching rule(s) + match = executeActions(session, context, rules, defaultActions, match); + logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); + + // transfer the match + session.getProvenanceReporter().modifyAttributes(match); + session.transfer(match, REL_SUCCESS); + } + } else { + // transfer the flowfile to no match (that has the default actions applied) + flowFile = executeActions(session, context, null, defaultActions, flowFile); + logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{flowFile, REL_SUCCESS.getName()}); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_SUCCESS); + } + } + } + + /** + * Evaluates the specified Criteria on the specified flowfile. Clones the + * specified flow file for each rule that is applied. Returns a mapping of + * rules to flow files. + * + * @param criteria + * @param original + * @return + */ + private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map<FlowFile, List<Rule>> matchedRules) { + final ProcessorLog logger = getLogger(); + final List<Rule> rules = criteria.getRules(); + + // consider each rule and hold a copy of the flowfile for each matched rule + for (final Rule rule : rules) { + // evaluate the rule + if (evaluateRule(context, rule, flowfile)) { + final FlowFile flowfileToUse; + + // determine if we should use the original flow file or clone + if (FlowFilePolicy.USE_ORIGINAL.equals(criteria.getFlowFilePolicy()) || matchedRules.isEmpty()) { + flowfileToUse = flowfile; + } else { + // clone the original for this rule + flowfileToUse = session.clone(flowfile); + } + + // store the flow file to use when executing this rule + List<Rule> rulesForFlowFile = matchedRules.get(flowfileToUse); + if (rulesForFlowFile == null) { + rulesForFlowFile = new ArrayList<>(); + matchedRules.put(flowfileToUse, rulesForFlowFile); + } + rulesForFlowFile.add(rule); + + // log if appropriate + if (logger.isDebugEnabled()) { + logger.debug(this + " all conditions met for rule '" + rule.getName() + "'. Using flow file - " + flowfileToUse); + } + } + } + + return !matchedRules.isEmpty(); + } + + /** + * Evaluates the specified rule on the specified flowfile. + * + * @param rule + * @param flowfile + * @return + */ + private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile) { + // go through each condition + for (final Condition condition : rule.getConditions()) { + + // fail if any condition is not met + if (!evaluateCondition(context, condition, flowfile)) { + return false; + } + } + + return true; + } + + private PropertyValue getPropertyValue(final String text, final ProcessContext context) { + PropertyValue currentValue = propertyValues.get(text); + if (currentValue == null) { + currentValue = context.newPropertyValue(text); + PropertyValue previousValue = propertyValues.putIfAbsent(text, currentValue); + if (previousValue != null) { + currentValue = previousValue; + } + } + + return currentValue; + } + + /** + * Evaluates the specified condition on the specified flowfile. + * + * @param condition + * @param flowfile + * @return + */ + private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile) { + try { + // evaluate the expression for the given flow file + return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile).asBoolean(); + } catch (final ProcessException pe) { + throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), pe), pe); + } + } + + /** + * Executes the specified action on the specified flowfile. + * + * @param action + * @param flowfile + */ + private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile) { + final ProcessorLog logger = getLogger(); + final Map<String, Action> actions = new HashMap<>(defaultActions); + final String ruleName = (rules == null || rules.isEmpty()) ? "default" : rules.get(rules.size() - 1).getName(); + + // if a rule matched, get its actions and possible overwrite the default ones + if (rules != null && rules.size() > 0) { + // combine all rules actions with the default actions... loop through the rules in order, this way + // subsequent matching rules will take precedence over previously matching rules and default values + for (final Rule rule : rules) { + for (final Action action : rule.getActions()) { + // store the action and overwrite the previous value (from the defaults or a previously matching rule) + actions.put(action.getAttribute(), action); + } + } + + // add an action for the matched rule - when matching multiple rules against + // the original flowfile (use original) this will leave the last matching + // rule's name as the value of this attribute. this decision was made since + // this would be the behavior if they user chained multiple UpdateAttributes + // together with 'use clone' specified + final Action matchedRuleAction = new Action(); + matchedRuleAction.setAttribute(getClass().getSimpleName() + ".matchedRule"); + matchedRuleAction.setValue(ruleName); + actions.put(matchedRuleAction.getAttribute(), matchedRuleAction); + } + + // attribute values that will be applied to the flow file + final Map<String, String> attributes = new HashMap<>(actions.size()); + + // go through each action + for (final Action action : actions.values()) { + try { + final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue(); + + // log if appropriate + if (logger.isDebugEnabled()) { + logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName)); + } + + attributes.put(action.getAttribute(), newAttributeValue); + } catch (final ProcessException pe) { + throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe); + } + } + + // If the 'alternate.identifier' attribute is added, then we want to create an ADD_INFO provenance event. + final String alternateIdentifier = attributes.get(CoreAttributes.ALTERNATE_IDENTIFIER.key()); + if (alternateIdentifier != null) { + try { + final URI uri = new URI(alternateIdentifier); + final String namespace = uri.getScheme(); + if (namespace != null) { + final String identifier = alternateIdentifier.substring(Math.min(namespace.length() + 1, alternateIdentifier.length() - 1)); + session.getProvenanceReporter().associate(flowfile, namespace, identifier); + } + } catch (final URISyntaxException e) { + } + } + + // update the flowfile attributes + return session.putAllAttributes(flowfile, attributes); + } + + /** + * Gets the default actions. + * + * @return + */ + private Map<String, Action> getDefaultActions(final Map<PropertyDescriptor, String> properties) { + final Map<String, Action> defaultActions = new HashMap<>(); + + for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { + final Action action = new Action(); + action.setAttribute(entry.getKey().getName()); + action.setValue(entry.getValue()); + defaultActions.put(action.getAttribute(), action); + } + + return defaultActions; + } + }
