This is an automated email from the ASF dual-hosted git repository.

mosermw pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new b287b3ada6 NIFI-12386 Adds processor FilterAttribute attribute uuid is 
not removed by removeAttribute(s) in MockProcessSession
b287b3ada6 is described below

commit b287b3ada61b914308e6b8a08b3ac30ba069ca9f
Author: EndzeitBegins <16666115+endzeitbeg...@users.noreply.github.com>
AuthorDate: Sat Nov 18 00:28:07 2023 +0100

    NIFI-12386 Adds processor FilterAttribute
    attribute uuid is not removed by removeAttribute(s) in MockProcessSession
    
    Signed-off-by: Mike Moser <mose...@apache.org>
    
    This closes #8064
---
 .../java/org/apache/nifi/util/MockFlowFile.java    |   5 +
 .../apache/nifi/util/TestMockProcessSession.java   |  26 ++
 .../nifi/processors/standard/FilterAttribute.java  | 273 +++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../processors/standard/TestFilterAttribute.java   | 388 +++++++++++++++++++++
 5 files changed, 693 insertions(+)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index f5702daed1..ec86fe7656 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -189,6 +189,11 @@ public class MockFlowFile implements FlowFileRecord {
 
     public void removeAttributes(final Set<String> attrNames) {
         for (final String attrName : attrNames) {
+            if (CoreAttributes.UUID.key().equals(attrName)) {
+                // the core attribute "uuid" of a FlowFile cannot be altered / 
removed
+                continue;
+            }
+
             attributes.remove(attrName);
         }
     }
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index 700d0d29b2..3787918653 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.util;
 
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -30,10 +31,13 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -182,6 +186,28 @@ public class TestMockProcessSession {
         output.get(0).assertAttributeEquals("key1", "val1");
     }
 
+    @Test
+    void testAttributeUUIDNotRemovable() {
+        final Processor processor = new PoorlyBehavedProcessor();
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong(0L)), processor, new 
MockStateManager(processor));
+        FlowFile ff1 = 
session.createFlowFile("removeAttribute(attrName)".getBytes());
+        FlowFile ff2 = 
session.createFlowFile("removeAllAttributes(attrNames)".getBytes());
+        FlowFile ff3 = 
session.createFlowFile("removeAllAttributes(keyPattern)".getBytes());
+
+        String attrName = CoreAttributes.UUID.key();
+        session.removeAttribute(ff1, attrName);
+        session.removeAllAttributes(ff2, new 
HashSet<>(Collections.singletonList(attrName)));
+        session.removeAllAttributes(ff3, 
Pattern.compile(Pattern.quote(attrName)));
+
+        session.transfer(Arrays.asList(ff1, ff2, ff3), 
PoorlyBehavedProcessor.REL_FAILURE);
+        session.commitAsync();
+        List<MockFlowFile> output = 
session.getFlowFilesForRelationship(PoorlyBehavedProcessor.REL_FAILURE);
+        assertEquals(3, output.size());
+        output.get(0).assertAttributeEquals(attrName, 
ff1.getAttribute(attrName));
+        output.get(1).assertAttributeEquals(attrName, 
ff2.getAttribute(attrName));
+        output.get(2).assertAttributeEquals(attrName, 
ff3.getAttribute(attrName));
+    }
+
     protected static class PoorlyBehavedProcessor extends AbstractProcessor {
 
         private static final Relationship REL_FAILURE = new 
Relationship.Builder()
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java
new file mode 100644
index 0000000000..0822868a02
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterAttribute.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.DefaultRunDuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@SideEffectFree
+@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"attributes", "modification", "filter", "retain", "remove", "delete", 
"regex", "regular expression", "Attribute Expression Language"})
+@CapabilityDescription("Filters the attributes of a FlowFile by retaining 
specified attributes and removing the rest or by removing specified attributes 
and retaining the rest.")
+public class FilterAttribute extends AbstractProcessor {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .description("All successful FlowFiles are routed to this 
relationship").name("success").build();
+
+    private final static Set<Relationship> relationships = 
Collections.singleton(REL_SUCCESS);
+
+
+    public static final AllowableValue FILTER_MODE_VALUE_RETAIN = new 
AllowableValue(
+            "RETAIN",
+            "Retain",
+            "Retains only the attributes matching the filter, all other 
attributes are removed."
+    );
+
+    public static final AllowableValue FILTER_MODE_VALUE_REMOVE = new 
AllowableValue(
+            "REMOVE",
+            "Remove",
+            "Removes the attributes matching the filter, all other attributes 
are retained."
+    );
+
+    public static final PropertyDescriptor FILTER_MODE = new 
PropertyDescriptor.Builder()
+            .name("FILTER_MODE")
+            .displayName("Filter mode")
+            .description("Specifies the strategy to apply on filtered 
attributes. Either 'Remove' or 'Retain' only the matching attributes.")
+            .required(true)
+            .allowableValues(FILTER_MODE_VALUE_RETAIN, 
FILTER_MODE_VALUE_REMOVE)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .defaultValue(FILTER_MODE_VALUE_RETAIN.getValue())
+            .build();
+
+    public static final AllowableValue MATCHING_STRATEGY_VALUE_ENUMERATION = 
new AllowableValue(
+            "ENUMERATION",
+            "Enumerate attributes",
+            "Provides a set of attribute keys to filter for, separated by a 
comma delimiter ','."
+    );
+
+    public static final AllowableValue MATCHING_STRATEGY_VALUE_REGEX = new 
AllowableValue(
+            "REGEX",
+            "Use regular expression",
+            "Provides a regular expression to match keys of attributes to 
filter for."
+    );
+
+    public static final PropertyDescriptor MATCHING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("MATCHING_STRATEGY")
+            .displayName("Attribute matching strategy")
+            .description("Specifies the strategy to filter attributes by.")
+            .required(true)
+            .allowableValues(MATCHING_STRATEGY_VALUE_ENUMERATION, 
MATCHING_STRATEGY_VALUE_REGEX)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .defaultValue(MATCHING_STRATEGY_VALUE_ENUMERATION.getValue())
+            .build();
+
+    public static final PropertyDescriptor ATTRIBUTE_SET = new 
PropertyDescriptor.Builder()
+            .name("ATTRIBUTE_SET")
+            .displayName("Set of attributes to filter")
+            .description("A set of attribute names to filter from FlowFiles. 
Each attribute name is separated by the comma delimiter ','.")
+            .required(true)
+            .dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_ENUMERATION)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor ATTRIBUTE_REGEX = new 
PropertyDescriptor.Builder()
+            .name("ATTRIBUTE_REGEX")
+            .displayName("Regular expression to filter attributes")
+            .description("A regular expression to match names of attributes to 
filter from FlowFiles.")
+            .required(true)
+            .dependsOn(MATCHING_STRATEGY, MATCHING_STRATEGY_VALUE_REGEX)
+            
.addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    private final static String DELIMITER_VALUE = ",";
+
+    private final static List<PropertyDescriptor> properties;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(FILTER_MODE);
+        _propertyDescriptors.add(MATCHING_STRATEGY);
+        _propertyDescriptors.add(ATTRIBUTE_SET);
+        _propertyDescriptors.add(ATTRIBUTE_REGEX);
+        properties = Collections.unmodifiableList(_propertyDescriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    private volatile Predicate<String> cachedMatchingPredicate;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final MatchingStrategy matchingStrategy = getMatchingStrategy(context);
+
+        cachedMatchingPredicate = null;
+
+        if (matchingStrategy == MatchingStrategy.ENUMERATION
+                && 
!context.getProperty(ATTRIBUTE_SET).isExpressionLanguagePresent()) {
+            cachedMatchingPredicate = 
determineMatchingPredicateBasedOnEnumeration(context, null);
+        }
+        if (matchingStrategy == MatchingStrategy.REGEX
+                && 
!context.getProperty(ATTRIBUTE_REGEX).isExpressionLanguagePresent()) {
+            cachedMatchingPredicate = 
determineMatchingPredicateBasedOnRegex(context, null);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final Predicate<String> matchingPredicate = 
determineMatchingPredicate(context, flowFile);
+
+        final FilterMode filterMode = getFilterMode(context);
+        final Predicate<String> isMatched;
+        if (filterMode == FilterMode.RETAIN) {
+            isMatched = matchingPredicate;
+        } else {
+            isMatched = matchingPredicate.negate();
+        }
+
+        final Set<String> attributesToRemove = new 
HashSet<>(flowFile.getAttributes().keySet());
+        attributesToRemove.removeIf(isMatched);
+
+        final FlowFile updatedFlowFile = session.removeAllAttributes(flowFile, 
attributesToRemove);
+        session.transfer(updatedFlowFile, REL_SUCCESS);
+    }
+
+    private Predicate<String> determineMatchingPredicate(ProcessContext 
context, FlowFile flowFile) {
+        if (cachedMatchingPredicate != null) {
+            return cachedMatchingPredicate;
+        }
+
+        final MatchingStrategy matchingStrategy = getMatchingStrategy(context);
+
+        switch (matchingStrategy) {
+            case ENUMERATION:
+                return determineMatchingPredicateBasedOnEnumeration(context, 
flowFile);
+            case REGEX:
+                return determineMatchingPredicateBasedOnRegex(context, 
flowFile);
+            default:
+                throw new IllegalArgumentException(
+                        "Cannot determine matching predicate for unsupported 
strategy " + matchingStrategy + " !"
+                );
+        }
+    }
+
+    private static Predicate<String> 
determineMatchingPredicateBasedOnEnumeration(ProcessContext context, FlowFile 
flowFile) {
+        final String attributeSetDeclaration = getAttributeSet(context, 
flowFile);
+        final String delimiter = getDelimiter();
+
+        Set<String> attributeSet = 
Arrays.stream(attributeSetDeclaration.split(Pattern.quote(delimiter)))
+                .map(String::trim)
+                .filter(attributeName -> !attributeName.trim().isEmpty())
+                .collect(Collectors.toSet());
+
+        return attributeSet::contains;
+    }
+
+    private static Predicate<String> 
determineMatchingPredicateBasedOnRegex(ProcessContext context, FlowFile 
flowFile) {
+        Pattern attributeRegex = getAttributeRegex(context, flowFile);
+
+        return attributeName -> 
attributeRegex.matcher(attributeName).matches();
+    }
+
+    /* properties */
+
+    private static FilterMode getFilterMode(ProcessContext context) {
+        final String rawFilterMode = context
+                .getProperty(FILTER_MODE)
+                .getValue();
+
+        if (FILTER_MODE_VALUE_REMOVE.getValue().equals(rawFilterMode)) {
+            return FilterMode.REMOVE;
+        }
+        return FilterMode.RETAIN;
+    }
+
+    private static MatchingStrategy getMatchingStrategy(ProcessContext 
context) {
+        final String rawMatchingStrategy = context
+                .getProperty(MATCHING_STRATEGY)
+                .getValue();
+
+        if 
(MATCHING_STRATEGY_VALUE_REGEX.getValue().equals(rawMatchingStrategy)) {
+            return MatchingStrategy.REGEX;
+        }
+        return MatchingStrategy.ENUMERATION;
+    }
+
+    private static String getAttributeSet(ProcessContext context, FlowFile 
flowFile) {
+        return 
context.getProperty(ATTRIBUTE_SET).evaluateAttributeExpressions(flowFile).getValue();
+    }
+
+    private static String getDelimiter() {
+        return DELIMITER_VALUE;
+    }
+
+    private static Pattern getAttributeRegex(ProcessContext context, FlowFile 
flowFile) {
+        return Pattern.compile(
+                
context.getProperty(ATTRIBUTE_REGEX).evaluateAttributeExpressions(flowFile).getValue()
+        );
+    }
+
+    private enum FilterMode {
+        RETAIN,
+        REMOVE,
+    }
+
+    private enum MatchingStrategy {
+        ENUMERATION,
+        REGEX,
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index cc1f10d424..54b928f3e2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -46,6 +46,7 @@ org.apache.nifi.processors.standard.FetchDistributedMapCache
 org.apache.nifi.processors.standard.FetchFile
 org.apache.nifi.processors.standard.FetchFTP
 org.apache.nifi.processors.standard.FetchSFTP
+org.apache.nifi.processors.standard.FilterAttribute
 org.apache.nifi.processors.standard.FlattenJson
 org.apache.nifi.processors.standard.ForkRecord
 org.apache.nifi.processors.standard.ForkEnrichment
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java
new file mode 100644
index 0000000000..4f9c36a63d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterAttribute.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestFilterAttribute {
+
+    private final TestRunner runner = 
TestRunners.newTestRunner(FilterAttribute.class);
+
+    private final String exampleContent = "lorem ipsum dolor sit amet";
+
+    private final Map<String, String> exampleAttributes = mapOf(
+            "foo", "fooValue",
+            "bar", "barValue",
+            "batz", "batzValue"
+    );
+
+    @Nested
+    class WithStrategyEnumeration {
+        @Nested
+        class InModeRetain {
+            @Test
+            void retainsAllAttributesWhenAllAreFiltered() {
+                final String attributeSet = "foo,bar,batz";
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"batz", "uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() {
+                final String attributeSet = "bar";
+                final Set<String> expectedAttributes = setOf("bar", "uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() {
+                final String attributeSet = "other";
+                final Set<String> expectedAttributes = setOf("uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void supportsAttributeNamesWithWhitespace() {
+                final Map<String, String> attributes = new 
HashMap<>(exampleAttributes);
+                attributes.put("fo\no", "some value");
+                final String attributeSet = "fo\no";
+                final Set<String> expectedAttributes = setOf("fo\no", "uuid");
+
+                runTestWith(attributes, attributeSet, expectedAttributes);
+            }
+        }
+
+        @Nested
+        class InModeRemove {
+
+            @BeforeEach
+            void setUp() {
+                runner.setProperty(FilterAttribute.FILTER_MODE, 
FilterAttribute.FILTER_MODE_VALUE_REMOVE);
+            }
+
+            @Test
+            void removesAllAttributesExceptUUIDWhenAllAreFiltered() {
+                final String attributeSet = "foo,bar,batz,uuid,path,filename";
+                final Set<String> expectedAttributes = setOf("uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() {
+                final String attributeSet = "bar,uuid,path,filename";
+                final Set<String> expectedAttributes = setOf("foo", "batz", 
"uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() {
+                final String attributeSet = "other";
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"batz", "uuid", "path", "filename");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void supportsAttributeNamesWithWhitespace() {
+                final Map<String, String> attributes = new 
HashMap<>(exampleAttributes);
+                attributes.put("fo\no", "some value");
+                final String attributeSet = "fo\no";
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"batz", "uuid", "path", "filename");
+
+                runTestWith(attributes, attributeSet, expectedAttributes);
+            }
+        }
+
+        @Nested
+        class RegardingAttributeSetParsing {
+
+            @Test
+            void ignoresLeadingDelimiters() {
+                final String attributeSet = ",foo,bar";
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void ignoresTrailingDelimiters() {
+                final String attributeSet = "foo,bar,";
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void doesNotYieldErrorWhenAttributeSetIsEffectivelyEmpty() {
+                final String attributeSet = " , ";
+                final Set<String> expectedAttributes = setOf("uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void worksWithSingleAttributeInSet() {
+                final String attributeSet = "batz";
+                final Set<String> expectedAttributes = setOf("batz", "uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void worksWithMultipleAttributesInSet() {
+                final String attributeSet = "foo,bar,batz";
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"batz", "uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void ignoresLeadingWhitespaceInAttributeName() {
+                final String attributeSet = "foo,  batz";
+                final Set<String> expectedAttributes = setOf("foo", "batz", 
"uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+
+            @Test
+            void ignoresTrailingWhitespaceInAttributeName() {
+                final String attributeSet = "foo  ,bar";
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"uuid");
+
+                runTestWith(exampleAttributes, attributeSet, 
expectedAttributes);
+            }
+        }
+
+        @Test
+        void supportsDefiningAttributeSetInFlowFileAttribute() {
+            final Map<String, String> attributes = new 
HashMap<>(exampleAttributes);
+            attributes.put("lookup", "bar,batz");
+            final String attributeSet = "${lookup}"; // NiFi EL with reference 
to FlowFile attribute
+            final Set<String> expectedAttributes = setOf("bar", "batz", 
"uuid");
+
+            runTestWith(attributes, attributeSet, expectedAttributes);
+        }
+
+        private void runTestWith(Map<String, String> attributes, String 
attributeSet, Set<String> expectedAttributes) {
+            runner.setProperty(FilterAttribute.MATCHING_STRATEGY, 
FilterAttribute.MATCHING_STRATEGY_VALUE_ENUMERATION);
+            runner.setProperty(FilterAttribute.ATTRIBUTE_SET, attributeSet);
+
+            final MockFlowFile input = runner.enqueue(exampleContent, 
attributes);
+            final Map<String, String> inputAttributes = input.getAttributes();
+            final Set<String> notExpectedAttributes = new 
HashSet<>(inputAttributes.keySet());
+            notExpectedAttributes.removeAll(expectedAttributes);
+
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 
1);
+            final MockFlowFile result = 
runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).get(0);
+            result.assertContentEquals(exampleContent);
+            for (String expectedName : expectedAttributes) {
+                final String expectedValue = inputAttributes.get(expectedName);
+
+                result.assertAttributeEquals(expectedName, expectedValue);
+            }
+            for (String notExpectedName : notExpectedAttributes) {
+                result.assertAttributeNotExists(notExpectedName);
+            }
+        }
+    }
+
+    @Nested
+    class WithStrategyRegex {
+
+        @Nested
+        class InModeRetain {
+            @Test
+            void retainsAllAttributesWhenAllAreFiltered() {
+                final Pattern attributeRegex = Pattern.compile("foo|bar|batz");
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"batz", "uuid");
+
+                runTestWith(exampleAttributes, attributeRegex, 
expectedAttributes);
+            }
+
+            @Test
+            void retainsUUIDAndFilteredAttributesWhenOnlySomeAreFiltered() {
+                final Pattern attributeRegex = Pattern.compile("bar");
+                final Set<String> expectedAttributes = setOf("bar", "uuid");
+
+                runTestWith(exampleAttributes, attributeRegex, 
expectedAttributes);
+            }
+
+            @Test
+            void retainsUUIDOnlyWhenNoneOfTheAttributesAreFiltered() {
+                final Pattern attributeRegex = Pattern.compile("other");
+                final Set<String> expectedAttributes = setOf("uuid");
+
+                runTestWith(exampleAttributes, attributeRegex, 
expectedAttributes);
+            }
+
+            @Test
+            void supportsAttributeNamesWithWhitespace() {
+                final Map<String, String> attributes = new 
HashMap<>(exampleAttributes);
+                attributes.put("fo\no", "some value");
+                final Pattern attributeRegex = Pattern.compile("fo\no");
+                final Set<String> expectedAttributes = setOf("fo\no", "uuid");
+
+                runTestWith(attributes, attributeRegex, expectedAttributes);
+            }
+        }
+
+        @Nested
+        class InModeRemove {
+
+            @BeforeEach
+            void setUp() {
+                runner.setProperty(FilterAttribute.FILTER_MODE, 
FilterAttribute.FILTER_MODE_VALUE_REMOVE);
+            }
+
+            @Test
+            void removesAllAttributesExceptUUIDWhenAllAreFiltered() {
+                final Pattern attributeRegex = 
Pattern.compile("foo|bar|batz|uuid|path|filename");
+                final Set<String> expectedAttributes = setOf("uuid");
+
+                runTestWith(exampleAttributes, attributeRegex, 
expectedAttributes);
+            }
+
+            @Test
+            void removesFilteredAttributesExceptUUIDWhenOnlySomeAreFiltered() {
+                final Pattern attributeRegex = 
Pattern.compile("bar|uuid|path|filename");
+                final Set<String> expectedAttributes = setOf("foo", "batz", 
"uuid");
+
+                runTestWith(exampleAttributes, attributeRegex, 
expectedAttributes);
+            }
+
+            @Test
+            void removesNoAttributeWhenNoneOfTheAttributesAreFiltered() {
+                final Pattern attributeRegex = Pattern.compile("other");
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"batz", "uuid", "path", "filename");
+
+                runTestWith(exampleAttributes, attributeRegex, 
expectedAttributes);
+            }
+
+            @Test
+            void supportsAttributeNamesWithWhitespace() {
+                final Map<String, String> attributes = new 
HashMap<>(exampleAttributes);
+                attributes.put("fo\no", "some value");
+                final Pattern attributeRegex = Pattern.compile("fo\no");
+                final Set<String> expectedAttributes = setOf("foo", "bar", 
"batz", "uuid", "path", "filename");
+
+                runTestWith(attributes, attributeRegex, expectedAttributes);
+            }
+        }
+
+        @Test
+        void supportsDefiningAttributeSetInFlowFileAttribute() {
+            final Map<String, String> attributes = new 
HashMap<>(exampleAttributes);
+            attributes.put("lookup", "bar|batz");
+            final String attributeRegex = "${lookup}"; // NiFi EL with 
reference to FlowFile attribute
+            final Set<String> expectedAttributes = setOf("bar", "batz", 
"uuid");
+
+            runTestWith(attributes, attributeRegex, expectedAttributes);
+        }
+
+        private void runTestWith(Map<String, String> attributes, Pattern 
regex, Set<String> expectedAttributes) {
+            runTestWith(attributes, regex.pattern(), expectedAttributes);
+        }
+
+        private void runTestWith(Map<String, String> attributes, String 
regexPattern, Set<String> expectedAttributes) {
+            runner.setProperty(FilterAttribute.MATCHING_STRATEGY, 
FilterAttribute.MATCHING_STRATEGY_VALUE_REGEX);
+            runner.setProperty(FilterAttribute.ATTRIBUTE_REGEX, regexPattern);
+
+            final MockFlowFile input = runner.enqueue(exampleContent, 
attributes);
+            final Map<String, String> inputAttributes = input.getAttributes();
+            final Set<String> notExpectedAttributes = new 
HashSet<>(inputAttributes.keySet());
+            notExpectedAttributes.removeAll(expectedAttributes);
+
+            runner.run();
+
+            runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 
1);
+            final MockFlowFile result = 
runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS).get(0);
+            result.assertContentEquals(exampleContent);
+            for (String expectedName : expectedAttributes) {
+                final String expectedValue = inputAttributes.get(expectedName);
+
+                result.assertAttributeEquals(expectedName, expectedValue);
+            }
+            for (String notExpectedName : notExpectedAttributes) {
+                result.assertAttributeNotExists(notExpectedName);
+            }
+        }
+    }
+
+
+    @Test
+    void supportMultiThreadedExecution() {
+        runner.setThreadCount(5);
+
+        final int flowFileCount = 10_000;
+        for (int i = 0; i < flowFileCount; i++) {
+            runner.enqueue(exampleContent, mapOf(
+                    "foo", "" + i,
+                    "bar", "" + i
+            ));
+        }
+        runner.setProperty(FilterAttribute.ATTRIBUTE_SET, "foo");
+
+        runner.run(flowFileCount);
+        runner.assertAllFlowFilesTransferred(FilterAttribute.REL_SUCCESS, 
flowFileCount);
+        List<MockFlowFile> resultFlowFiles = 
runner.getFlowFilesForRelationship(FilterAttribute.REL_SUCCESS);
+        for (final MockFlowFile resultFlowFile : resultFlowFiles) {
+            resultFlowFile.assertAttributeExists("foo");
+            resultFlowFile.assertAttributeNotExists("bar");
+        }
+        final Set<String> fooValues = resultFlowFiles.stream()
+                .map(flowFile -> flowFile.getAttribute("foo"))
+                .collect(Collectors.toSet());
+        assertEquals(flowFileCount, fooValues.size());
+    }
+
+    private static Map<String, String> mapOf(String... keyValues) {
+        final HashMap<String, String> map = new HashMap<>();
+
+        for (int i = 0; i < keyValues.length - 1; i += 2) {
+            map.put(keyValues[i], keyValues[i + 1]);
+        }
+
+        return map;
+    }
+
+    private static Set<String> setOf(String... values) {
+        return new HashSet<>(Arrays.asList(values));
+    }
+}
\ No newline at end of file


Reply via email to