This is an automated email from the ASF dual-hosted git repository. mosermw pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new b287b3ada6 NIFI-12386 Adds processor FilterAttribute attribute uuid is not removed by removeAttribute(s) in MockProcessSession b287b3ada6 is described below commit b287b3ada61b914308e6b8a08b3ac30ba069ca9f Author: EndzeitBegins <16666115+endzeitbeg...@users.noreply.github.com> AuthorDate: Sat Nov 18 00:28:07 2023 +0100 NIFI-12386 Adds processor FilterAttribute attribute uuid is not removed by removeAttribute(s) in MockProcessSession Signed-off-by: Mike Moser <mose...@apache.org> This closes #8064 --- .../java/org/apache/nifi/util/MockFlowFile.java | 5 + .../apache/nifi/util/TestMockProcessSession.java | 26 ++ .../nifi/processors/standard/FilterAttribute.java | 273 +++++++++++++++ .../services/org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestFilterAttribute.java | 388 +++++++++++++++++++++ 5 files changed, 693 insertions(+) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index f5702daed1..ec86fe7656 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -189,6 +189,11 @@ public class MockFlowFile implements FlowFileRecord { public void removeAttributes(final Set<String> attrNames) { for (final String attrName : attrNames) { + if (CoreAttributes.UUID.key().equals(attrName)) { + // the core attribute "uuid" of a FlowFile cannot be altered / removed + continue; + } + attributes.remove(attrName); } } diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index 700d0d29b2..3787918653 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -17,6 +17,7 @@ package org.apache.nifi.util; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -30,10 +31,13 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -182,6 +186,28 @@ public class TestMockProcessSession { output.get(0).assertAttributeEquals("key1", "val1"); } + @Test + void testAttributeUUIDNotRemovable() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor, new MockStateManager(processor)); + FlowFile ff1 = session.createFlowFile("removeAttribute(attrName)".getBytes()); + FlowFile ff2 = session.createFlowFile("removeAllAttributes(attrNames)".getBytes()); + FlowFile ff3 = session.createFlowFile("removeAllAttributes(keyPattern)".getBytes()); + + String attrName = CoreAttributes.UUID.key(); + session.removeAttribute(ff1, attrName); + session.removeAllAttributes(ff2, new HashSet<>(Collections.singletonList(attrName))); + session.removeAllAttributes(ff3, Pattern.compile(Pattern.quote(attrName))); + + session.transfer(Arrays.asList(ff1, ff2, ff3), PoorlyBehavedProcessor.REL_FAILURE); + session.commitAsync(); + List<MockFlowFile> output = session.getFlowFilesForRelationship(PoorlyBehavedProcessor.REL_FAILURE); + assertEquals(3, output.size()); + output.get(0).assertAttributeEquals(attrName, ff1.getAttribute(attrName)); + output.get(1).assertAttributeEquals(attrName, ff2.getAttribute(attrName)); + output.get(2).assertAttributeEquals(attrName, ff3.getAttribute(attrName)); + } + protected static class PoorlyBehavedProcessor extends AbstractProcessor { private static final Relationship REL_FAILURE = new Relationship.Builder() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java new file mode 100644 index 0000000000..0822868a02 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.DefaultRunDuration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@SideEffectFree +@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"attributes", "modification", "filter", "retain", "remove", "delete", "regex", "regular expression", "Attribute Expression Language"}) +@CapabilityDescription("Filters the attributes of a FlowFile by retaining specified attributes and removing the rest or by removing specified attributes and retaining the rest.") +public class FilterAttribute extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .description("All successful FlowFiles are routed to this relationship").name("success").build(); + + private final static Set<Relationship> relationships = Collections.singleton(REL_SUCCESS); + + + public static final AllowableValue FILTER_MODE_VALUE_RETAIN = new AllowableValue( + "RETAIN", + "Retain", + "Retains only the attributes matching the filter, all other attributes are removed." + ); + + public static final AllowableValue FILTER_MODE_VALUE_REMOVE = new AllowableValue( + "REMOVE", + "Remove", + "Removes the attributes matching the filter, all other attributes are retained." + ); + + public static final PropertyDescriptor FILTER_MODE = new PropertyDescriptor.Builder() + .name("FILTER_MODE") + .displayName("Filter mode") + .description("Specifies the strategy to apply on filtered attributes. Either 'Remove' or 'Retain' only the matching attributes.") + .required(true) + .allowableValues(FILTER_MODE_VALUE_RETAIN, FILTER_MODE_VALUE_REMOVE) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .defaultValue(FILTER_MODE_VALUE_RETAIN.getValue()) + .build(); + + public static final AllowableValue MATCHING_STRATEGY_VALUE_ENUMERATION = new AllowableValue( + "ENUMERATION", + "Enumerate attributes", + "Provides a set of attribute keys to filter for, separated by a comma delimiter ','." + ); + + public static final AllowableValue MATCHING_STRATEGY_VALUE_REGEX = new AllowableValue( + "REGEX", + "Use regular expression", + "Provides a regular expression to match keys of attributes to filter for." + ); + + public static final PropertyDescriptor MATCHING_STRATEGY = new PropertyDescriptor.Builder() + .name("MATCHING_STRATEGY") + .displayName("Attribute matching strategy") + .description("Specifies the strategy to filter attributes by.") + .required(true) + .allowableValues(MATCHING_STRATEGY_VALUE_ENUMERATION, MATCHING_STRATEGY_VALUE_REGEX) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .defaultValue(MATCHING_STRATEGY_VALUE_ENUMERATION.getValue()) + .build(); + + public static final PropertyDescriptor ATTRIBUTE_SET = new PropertyDescriptor.Builder() + .name("ATTRIBUTE_SET") + .displayName("Set of attributes to filter") + .description("A set of attribute names to filter from FlowFiles. Each attribute name is separated by the comma delimiter ','.") + .required(true) + .dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_ENUMERATION) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ATTRIBUTE_REGEX = new PropertyDescriptor.Builder() + .name("ATTRIBUTE_REGEX") + .displayName("Regular expression to filter attributes") + .description("A regular expression to match names of attributes to filter from FlowFiles.") + .required(true) + .dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_REGEX) + .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + private final static String DELIMITER_VALUE = ","; + + private final static List<PropertyDescriptor> properties; + + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(FILTER_MODE); + _propertyDescriptors.add(MATCHING_STRATEGY); + _propertyDescriptors.add(ATTRIBUTE_SET); + _propertyDescriptors.add(ATTRIBUTE_REGEX); + properties = Collections.unmodifiableList(_propertyDescriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + private volatile Predicate<String> cachedMatchingPredicate; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final MatchingStrategy matchingStrategy = getMatchingStrategy(context); + + cachedMatchingPredicate = null; + + if (matchingStrategy == MatchingStrategy.ENUMERATION + && !context.getProperty(ATTRIBUTE_SET).isExpressionLanguagePresent()) { + cachedMatchingPredicate = determineMatchingPredicateBasedOnEnumeration(context, null); + } + if (matchingStrategy == MatchingStrategy.REGEX + && !context.getProperty(ATTRIBUTE_REGEX).isExpressionLanguagePresent()) { + cachedMatchingPredicate = determineMatchingPredicateBasedOnRegex(context, null); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Predicate<String> matchingPredicate = determineMatchingPredicate(context, flowFile); + + final FilterMode filterMode = getFilterMode(context); + final Predicate<String> isMatched; + if (filterMode == FilterMode.RETAIN) { + isMatched = matchingPredicate; + } else { + isMatched = matchingPredicate.negate(); + } + + final Set<String> attributesToRemove = new HashSet<>(flowFile.getAttributes().keySet()); + attributesToRemove.removeIf(isMatched); + + final FlowFile updatedFlowFile = session.removeAllAttributes(flowFile, attributesToRemove); + session.transfer(updatedFlowFile, REL_SUCCESS); + } + + private Predicate<String> determineMatchingPredicate(ProcessContext context, FlowFile flowFile) { + if (cachedMatchingPredicate != null) { + return cachedMatchingPredicate; + } + + final MatchingStrategy matchingStrategy = getMatchingStrategy(context); + + switch (matchingStrategy) { + case ENUMERATION: + return determineMatchingPredicateBasedOnEnumeration(context, flowFile); + case REGEX: + return determineMatchingPredicateBasedOnRegex(context, flowFile); + default: + throw new IllegalArgumentException( + "Cannot determine matching predicate for unsupported strategy " + matchingStrategy + " !" + ); + } + } + + private static Predicate<String> determineMatchingPredicateBasedOnEnumeration(ProcessContext context, FlowFile flowFile) { + final String attributeSetDeclaration = getAttributeSet(context, flowFile); + final String delimiter = getDelimiter(); + + Set<String> attributeSet = Arrays.stream(attributeSetDeclaration.split(Pattern.quote(delimiter))) + .map(String::trim) + .filter(attributeName -> !attributeName.trim().isEmpty()) + .collect(Collectors.toSet()); + + return attributeSet::contains; + } + + private static Predicate<String> determineMatchingPredicateBasedOnRegex(ProcessContext context, FlowFile flowFile) { + Pattern attributeRegex = getAttributeRegex(context, flowFile); + + return attributeName -> attributeRegex.matcher(attributeName).matches(); + } + + /* properties */ + + private static FilterMode getFilterMode(ProcessContext context) { + final String rawFilterMode = context + .getProperty(FILTER_MODE) + .getValue(); + + if (FILTER_MODE_VALUE_REMOVE.getValue().equals(rawFilterMode)) { + return FilterMode.REMOVE; + } + return FilterMode.RETAIN; + } + + private static MatchingStrategy getMatchingStrategy(ProcessContext context) { + final String rawMatchingStrategy = context + .getProperty(MATCHING_STRATEGY) + .getValue(); + + if (MATCHING_STRATEGY_VALUE_REGEX.getValue().equals(rawMatchingStrategy)) { + return MatchingStrategy.REGEX; + } + return MatchingStrategy.ENUMERATION; + } + + private static String getAttributeSet(ProcessContext context, FlowFile flowFile) { + return context.getProperty(ATTRIBUTE_SET).evaluateAttributeExpressions(flowFile).getValue(); + } + + private static String getDelimiter() { + return DELIMITER_VALUE; + } + + private static Pattern getAttributeRegex(ProcessContext context, FlowFile flowFile) { + return Pattern.compile( + context.getProperty(ATTRIBUTE_REGEX).evaluateAttributeExpressions(flowFile).getValue() + ); + } + + private enum FilterMode { + RETAIN, + REMOVE, + } + + private enum MatchingStrategy { + ENUMERATION, + REGEX, + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index cc1f10d424..54b928f3e2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -46,6 +46,7 @@ org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.FetchFile org.apache.nifi.processors.standard.FetchFTP org.apache.nifi.processors.standard.FetchSFTP +org.apache.nifi.processors.standard.FilterAttribute org.apache.nifi.processors.standard.FlattenJson org.apache.nifi.processors.standard.ForkRecord org.apache.nifi.processors.standard.ForkEnrichment diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java new file mode 100644 index 0000000000..4f9c36a63d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestFilterAttribute { + + private final TestRunner runner = TestRunners.newTestRunner(FilterAttribute.class); + + private final String exampleContent = "lorem ipsum dolor sit amet"; + + private final Map<String, String> exampleAttributes = mapOf( + "foo", "fooValue", + "bar", "barValue", + "batz", "batzValue" + ); + + @Nested + class WithStrategyEnumeration { + @Nested + class InModeRetain { + @Test + void retainsAllAttributesWhenAllAreFiltered() { + final String attributeSet = "foo,bar,batz"; + final Set<String> expectedAttributes = setOf("foo", "bar", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() { + final String attributeSet = "bar"; + final Set<String> expectedAttributes = setOf("bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() { + final String attributeSet = "other"; + final Set<String> expectedAttributes = setOf("uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map<String, String> attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final String attributeSet = "fo\no"; + final Set<String> expectedAttributes = setOf("fo\no", "uuid"); + + runTestWith(attributes, attributeSet, expectedAttributes); + } + } + + @Nested + class InModeRemove { + + @BeforeEach + void setUp() { + runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE); + } + + @Test + void removesAllAttributesExceptUUIDWhenAllAreFiltered() { + final String attributeSet = "foo,bar,batz,uuid,path,filename"; + final Set<String> expectedAttributes = setOf("uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() { + final String attributeSet = "bar,uuid,path,filename"; + final Set<String> expectedAttributes = setOf("foo", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() { + final String attributeSet = "other"; + final Set<String> expectedAttributes = setOf("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map<String, String> attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final String attributeSet = "fo\no"; + final Set<String> expectedAttributes = setOf("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(attributes, attributeSet, expectedAttributes); + } + } + + @Nested + class RegardingAttributeSetParsing { + + @Test + void ignoresLeadingDelimiters() { + final String attributeSet = ",foo,bar"; + final Set<String> expectedAttributes = setOf("foo", "bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void ignoresTrailingDelimiters() { + final String attributeSet = "foo,bar,"; + final Set<String> expectedAttributes = setOf("foo", "bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void doesNotYieldErrorWhenAttributeSetIsEffectivelyEmpty() { + final String attributeSet = " , "; + final Set<String> expectedAttributes = setOf("uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void worksWithSingleAttributeInSet() { + final String attributeSet = "batz"; + final Set<String> expectedAttributes = setOf("batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void worksWithMultipleAttributesInSet() { + final String attributeSet = "foo,bar,batz"; + final Set<String> expectedAttributes = setOf("foo", "bar", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void ignoresLeadingWhitespaceInAttributeName() { + final String attributeSet = "foo, batz"; + final Set<String> expectedAttributes = setOf("foo", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + + @Test + void ignoresTrailingWhitespaceInAttributeName() { + final String attributeSet = "foo ,bar"; + final Set<String> expectedAttributes = setOf("foo", "bar", "uuid"); + + runTestWith(exampleAttributes, attributeSet, expectedAttributes); + } + } + + @Test + void supportsDefiningAttributeSetInFlowFileAttribute() { + final Map<String, String> attributes = new HashMap<>(exampleAttributes); + attributes.put("lookup", "bar,batz"); + final String attributeSet = "${lookup}"; // NiFi EL with reference to FlowFile attribute + final Set<String> expectedAttributes = setOf("bar", "batz", "uuid"); + + runTestWith(attributes, attributeSet, expectedAttributes); + } + + private void runTestWith(Map<String, String> attributes, String attributeSet, Set<String> expectedAttributes) { + runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_ENUMERATION); + runner.setProperty(FilterAttribute.ATTRIBUTE_SET, attributeSet); + + final MockFlowFile input = runner.enqueue(exampleContent, attributes); + final Map<String, String> inputAttributes = input.getAttributes(); + final Set<String> notExpectedAttributes = new HashSet<>(inputAttributes.keySet()); + notExpectedAttributes.removeAll(expectedAttributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 1); + final MockFlowFile result = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).get(0); + result.assertContentEquals(exampleContent); + for (String expectedName : expectedAttributes) { + final String expectedValue = inputAttributes.get(expectedName); + + result.assertAttributeEquals(expectedName, expectedValue); + } + for (String notExpectedName : notExpectedAttributes) { + result.assertAttributeNotExists(notExpectedName); + } + } + } + + @Nested + class WithStrategyRegex { + + @Nested + class InModeRetain { + @Test + void retainsAllAttributesWhenAllAreFiltered() { + final Pattern attributeRegex = Pattern.compile("foo|bar|batz"); + final Set<String> expectedAttributes = setOf("foo", "bar", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() { + final Pattern attributeRegex = Pattern.compile("bar"); + final Set<String> expectedAttributes = setOf("bar", "uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() { + final Pattern attributeRegex = Pattern.compile("other"); + final Set<String> expectedAttributes = setOf("uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map<String, String> attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final Pattern attributeRegex = Pattern.compile("fo\no"); + final Set<String> expectedAttributes = setOf("fo\no", "uuid"); + + runTestWith(attributes, attributeRegex, expectedAttributes); + } + } + + @Nested + class InModeRemove { + + @BeforeEach + void setUp() { + runner.setProperty(FilterAttribute.FILTER_MODE, FilterAttribute.FILTER_MODE_VALUE_REMOVE); + } + + @Test + void removesAllAttributesExceptUUIDWhenAllAreFiltered() { + final Pattern attributeRegex = Pattern.compile("foo|bar|batz|uuid|path|filename"); + final Set<String> expectedAttributes = setOf("uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() { + final Pattern attributeRegex = Pattern.compile("bar|uuid|path|filename"); + final Set<String> expectedAttributes = setOf("foo", "batz", "uuid"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() { + final Pattern attributeRegex = Pattern.compile("other"); + final Set<String> expectedAttributes = setOf("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(exampleAttributes, attributeRegex, expectedAttributes); + } + + @Test + void supportsAttributeNamesWithWhitespace() { + final Map<String, String> attributes = new HashMap<>(exampleAttributes); + attributes.put("fo\no", "some value"); + final Pattern attributeRegex = Pattern.compile("fo\no"); + final Set<String> expectedAttributes = setOf("foo", "bar", "batz", "uuid", "path", "filename"); + + runTestWith(attributes, attributeRegex, expectedAttributes); + } + } + + @Test + void supportsDefiningAttributeSetInFlowFileAttribute() { + final Map<String, String> attributes = new HashMap<>(exampleAttributes); + attributes.put("lookup", "bar|batz"); + final String attributeRegex = "${lookup}"; // NiFi EL with reference to FlowFile attribute + final Set<String> expectedAttributes = setOf("bar", "batz", "uuid"); + + runTestWith(attributes, attributeRegex, expectedAttributes); + } + + private void runTestWith(Map<String, String> attributes, Pattern regex, Set<String> expectedAttributes) { + runTestWith(attributes, regex.pattern(), expectedAttributes); + } + + private void runTestWith(Map<String, String> attributes, String regexPattern, Set<String> expectedAttributes) { + runner.setProperty(FilterAttribute.MATCHING_STRATEGY, FilterAttribute.MATCHING_STRATEGY_VALUE_REGEX); + runner.setProperty(FilterAttribute.ATTRIBUTE_REGEX, regexPattern); + + final MockFlowFile input = runner.enqueue(exampleContent, attributes); + final Map<String, String> inputAttributes = input.getAttributes(); + final Set<String> notExpectedAttributes = new HashSet<>(inputAttributes.keySet()); + notExpectedAttributes.removeAll(expectedAttributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 1); + final MockFlowFile result = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).get(0); + result.assertContentEquals(exampleContent); + for (String expectedName : expectedAttributes) { + final String expectedValue = inputAttributes.get(expectedName); + + result.assertAttributeEquals(expectedName, expectedValue); + } + for (String notExpectedName : notExpectedAttributes) { + result.assertAttributeNotExists(notExpectedName); + } + } + } + + + @Test + void supportMultiThreadedExecution() { + runner.setThreadCount(5); + + final int flowFileCount = 10_000; + for (int i = 0; i < flowFileCount; i++) { + runner.enqueue(exampleContent, mapOf( + "foo", "" + i, + "bar", "" + i + )); + } + runner.setProperty(FilterAttribute.ATTRIBUTE_SET, "foo"); + + runner.run(flowFileCount); + runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, flowFileCount); + List<MockFlowFile> resultFlowFiles = runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS); + for (final MockFlowFile resultFlowFile : resultFlowFiles) { + resultFlowFile.assertAttributeExists("foo"); + resultFlowFile.assertAttributeNotExists("bar"); + } + final Set<String> fooValues = resultFlowFiles.stream() + .map(flowFile -> flowFile.getAttribute("foo")) + .collect(Collectors.toSet()); + assertEquals(flowFileCount, fooValues.size()); + } + + private static Map<String, String> mapOf(String... keyValues) { + final HashMap<String, String> map = new HashMap<>(); + + for (int i = 0; i < keyValues.length - 1; i += 2) { + map.put(keyValues[i], keyValues[i + 1]); + } + + return map; + } + + private static Set<String> setOf(String... values) { + return new HashSet<>(Arrays.asList(values)); + } +} \ No newline at end of file