This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ca59d71e6 NIFI-14132 Flow Analysis Rule to check flowfile expiration 
configuration
2ca59d71e6 is described below

commit 2ca59d71e6045e5c5bea26cfa865244c9c305758
Author: Mike Moser <[email protected]>
AuthorDate: Thu Jan 9 21:01:20 2025 +0000

    NIFI-14132 Flow Analysis Rule to check flowfile expiration configuration
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9664.
---
 .../nifi-standard-rules/pom.xml                    |   2 +
 .../flowanalysis/rules/DisallowComponentType.java  |  17 +--
 .../rules/RestrictBackpressureSettings.java        | 127 ++++++----------
 .../rules/RestrictFlowFileExpiration.java          | 168 +++++++++++++++++++++
 .../rules/util/ConnectionViolation.java            | 125 +++++++++++++++
 .../rules/util/FlowAnalysisRuleUtils.java          |  65 ++++++++
 .../flowanalysis/rules/util/ViolationType.java     |  34 +++++
 .../org.apache.nifi.flowanalysis.FlowAnalysisRule  |   1 +
 .../rules/RestrictBackpressureSettingsTest.java    |  20 ++-
 .../rules/RestrictFlowFileExpirationTest.java      |  92 +++++++++++
 .../RestrictFlowFileExpiration.json                |   1 +
 .../RestrictFlowFileExpiration_noViolation.json    |   1 +
 12 files changed, 555 insertions(+), 98 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
index 1a24deab13..047c5810fb 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
@@ -34,6 +34,8 @@
                     <excludes combine.children="append">
                         
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json</exclude>
                         
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json</exclude>
+                        
<exclude>src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json</exclude>
+                        
<exclude>src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
index cacb5a81dd..e562a59ab5 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
@@ -26,9 +26,7 @@ import org.apache.nifi.flowanalysis.ComponentAnalysisResult;
 import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 
@@ -44,17 +42,13 @@ public class DisallowComponentType extends 
AbstractFlowAnalysisRule {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
-    private final static List<PropertyDescriptor> propertyDescriptors;
-
-    static {
-        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
-        _propertyDescriptors.add(COMPONENT_TYPE);
-        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
-    }
+    private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            COMPONENT_TYPE
+    );
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propertyDescriptors;
+        return PROPERTY_DESCRIPTORS;
     }
 
     @Override
@@ -63,8 +57,7 @@ public class DisallowComponentType extends 
AbstractFlowAnalysisRule {
 
         String componentType = context.getProperty(COMPONENT_TYPE).getValue();
 
-        if (component instanceof VersionedExtensionComponent) {
-            VersionedExtensionComponent versionedExtensionComponent = 
(VersionedExtensionComponent) component;
+        if (component instanceof VersionedExtensionComponent 
versionedExtensionComponent) {
 
             String encounteredComponentType = 
versionedExtensionComponent.getType();
             String encounteredSimpleComponentType = 
encounteredComponentType.substring(encounteredComponentType.lastIndexOf(".") + 
1);
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
index d336cbfdb6..9f8094b69e 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
@@ -21,24 +21,19 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flow.VersionedComponent;
-import org.apache.nifi.flow.VersionedConnection;
 import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.flow.VersionedProcessor;
 import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
 import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
 import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.flowanalysis.rules.util.ConnectionViolation;
+import org.apache.nifi.flowanalysis.rules.util.FlowAnalysisRuleUtils;
+import org.apache.nifi.flowanalysis.rules.util.ViolationType;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 @Tags({"connection", "backpressure"})
 @CapabilityDescription("This rule will generate a violation if backpressure 
settings of a connection exceed configured thresholds. "
@@ -125,117 +120,81 @@ public class RestrictBackpressureSettings extends 
AbstractFlowAnalysisRule {
 
     @Override
     public Collection<GroupAnalysisResult> 
analyzeProcessGroup(VersionedProcessGroup pg, FlowAnalysisRuleContext context) {
-        final Collection<GroupAnalysisResult> results = new HashSet<>();
+        final Collection<ConnectionViolation> violations = new ArrayList<>();
 
         final long minCount = context.getProperty(COUNT_MIN).asLong();
         final long maxCount = context.getProperty(COUNT_MAX).asLong();
         final double minSize = 
context.getProperty(SIZE_MIN).asDataSize(DataUnit.B);
         final double maxSize = 
context.getProperty(SIZE_MAX).asDataSize(DataUnit.B);
 
-        // Map of all id/components to generate more human readable violations
-        final Map<String, VersionedComponent> idComponent = Stream.of(
-                pg.getFunnels().stream(),
-                pg.getProcessors().stream(),
-                pg.getInputPorts().stream(),
-                pg.getOutputPorts().stream())
-                .flatMap(c -> c)
-                .collect(Collectors.toMap(c -> c.getIdentifier(), 
Function.identity()));
-
-        pg.getConnections().stream().forEach(
+        pg.getConnections().forEach(
                 connection -> {
                     if (connection.getBackPressureObjectThreshold() < 
minCount) {
-                        results.add(buildViolation(connection,
-                                
idComponent.get(connection.getSource().getId()),
-                                
idComponent.get(connection.getDestination().getId()),
+                        violations.add(new ConnectionViolation(connection,
                                 
BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT,
-                                
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT, 
connection.getBackPressureObjectThreshold().toString(), 
Long.toString(minCount))));
+                                this.getClass().getSimpleName(),
+                                
connection.getBackPressureObjectThreshold().toString(),
+                                context.getProperty(COUNT_MIN).getValue()));
                     }
                     if (connection.getBackPressureObjectThreshold() > 
maxCount) {
-                        results.add(buildViolation(connection,
-                                
idComponent.get(connection.getSource().getId()),
-                                
idComponent.get(connection.getDestination().getId()),
+                        violations.add(new ConnectionViolation(connection,
                                 
BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT,
-                                
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT, 
connection.getBackPressureObjectThreshold().toString(), 
Long.toString(maxCount))));
+                                this.getClass().getSimpleName(),
+                                
connection.getBackPressureObjectThreshold().toString(),
+                                context.getProperty(COUNT_MAX).getValue()));
                     }
                     final double sizeThreshold = 
DataUnit.parseDataSize(connection.getBackPressureDataSizeThreshold(), 
DataUnit.B);
                     if (Double.compare(sizeThreshold, minSize) < 0) {
-                        results.add(buildViolation(connection,
-                                
idComponent.get(connection.getSource().getId()),
-                                
idComponent.get(connection.getDestination().getId()),
+                        violations.add(new ConnectionViolation(connection,
                                 
BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT,
-                                
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT, 
connection.getBackPressureDataSizeThreshold(), 
context.getProperty(SIZE_MIN).getValue())));
+                                this.getClass().getSimpleName(),
+                                connection.getBackPressureDataSizeThreshold(),
+                                context.getProperty(SIZE_MIN).getValue()));
                     }
                     if (Double.compare(sizeThreshold, maxSize) > 0) {
-                        results.add(buildViolation(connection,
-                                
idComponent.get(connection.getSource().getId()),
-                                
idComponent.get(connection.getDestination().getId()),
+                        violations.add(new ConnectionViolation(connection,
                                 
BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT,
-                                
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT, 
connection.getBackPressureDataSizeThreshold(), 
context.getProperty(SIZE_MAX).getValue())));
+                                this.getClass().getSimpleName(),
+                                connection.getBackPressureDataSizeThreshold(),
+                                context.getProperty(SIZE_MAX).getValue()));
                     }
                 }
             );
 
-        return results;
-    }
-
-    private GroupAnalysisResult buildViolation(final VersionedConnection 
connection, final VersionedComponent source,
-            final VersionedComponent destination, final 
BackpressureViolationType backpressureViolationType, final String 
violationMessage) {
-        // The reason why we want the violation to be on the processor when we 
have one
-        // (either as source or destination) is because in case the rule is 
"enforced"
-        // we want the corresponding component to be invalid. If this is not a 
processor
-        // (funnel, process group, etc) we cannot make it invalid so we put the
-        // violation on the connection itself.
-        if (!(source instanceof VersionedProcessor) && !(destination 
instanceof VersionedProcessor)) {
-            // connection between two components that are not processors and 
cannot be invalid, setting violation on connection
-            return GroupAnalysisResult.forComponent(connection,
-                    connection.getIdentifier() + "_" + 
backpressureViolationType.getId(),
-                    getLocationMessage(connection, source, destination) + 
violationMessage).build();
-        } else if (source instanceof VersionedProcessor) {
-            // defining violation on source processor
-            return GroupAnalysisResult.forComponent(source,
-                    connection.getIdentifier() + "_" + 
backpressureViolationType.getId(),
-                    getLocationMessage(connection, source, destination) + 
violationMessage).build();
-        } else {
-            // defining violation on destination processor
-            return GroupAnalysisResult.forComponent(destination,
-                    connection.getIdentifier() + "_" + 
backpressureViolationType.getId(),
-                    getLocationMessage(connection, source, destination) + 
violationMessage).build();
-        }
-    }
-
-    private String getLocationMessage(final VersionedConnection connection, 
final VersionedComponent source, final VersionedComponent destination) {
-        if (source == null || destination == null) {
-            return "The connection [" + connection.getIdentifier() + "] is 
violating the rule for backpressure settings. ";
-        }
-        return "The connection [" + connection.getIdentifier() + "] connecting 
" + source.getName() + " [" + source.getIdentifier() + "] to "
-                + destination.getName() + " [" + destination.getIdentifier() + 
"] is violating the rule for backpressure settings. ";
-    }
-
-    private String getViolationMessage(final BackpressureViolationType 
backpressureViolationType, final String configured, final String limit) {
-        return switch (backpressureViolationType) {
-        case BP_COUNT_THRESHOLD_ABOVE_LIMIT -> "The connection is configured 
with a Backpressure Count Threshold of " + configured + " and it should be 
lesser or equal than " + limit + ".";
-        case BP_COUNT_THRESHOLD_BELOW_LIMIT -> "The connection is configured 
with a Backpressure Count Threshold of " + configured + " and it should be 
greater or equal than " + limit + ".";
-        case BP_SIZE_THRESHOLD_ABOVE_LIMIT -> "The connection is configured 
with a Backpressure Data Size Threshold of " + configured + " and it should be 
lesser or equal than " + limit + ".";
-        case BP_SIZE_THRESHOLD_BELOW_LIMIT -> "The connection is configured 
with a Backpressure Data Size Threshold of " + configured + " and it should be 
greater or equal than " + limit + ".";
-        };
+        return FlowAnalysisRuleUtils.convertToGroupAnalysisResults(pg, 
violations);
     }
 
-    private enum BackpressureViolationType {
+    private enum BackpressureViolationType implements ViolationType {
 
-        BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow"),
-        BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh"),
-        BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow"),
-        BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh");
+        BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow", 
"Back Pressure Count Threshold", "cannot be less than"),
+        BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh", 
"Back Pressure Count Threshold", "cannot be greater than"),
+        BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow", "Back 
Pressure Data Size Threshold", "cannot be less than"),
+        BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh", 
"Back Pressure Data Size Threshold", "cannot be greater than");
 
         private final String id;
+        private final String configurationItem;
+        private final String violationMessage;
 
-        BackpressureViolationType(String id) {
+        BackpressureViolationType(String id, String configurationItem, String 
violationMessage) {
             this.id = id;
+            this.configurationItem = configurationItem;
+            this.violationMessage = violationMessage;
         }
 
+        @Override
         public String getId() {
             return this.id;
         }
 
+        @Override
+        public String getConfigurationItem() {
+            return this.configurationItem;
+        }
+
+        @Override
+        public String getViolationMessage() {
+            return this.violationMessage;
+        }
+
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpiration.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpiration.java
new file mode 100644
index 0000000000..41046a25b5
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpiration.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
+import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.flowanalysis.rules.util.ConnectionViolation;
+import org.apache.nifi.flowanalysis.rules.util.FlowAnalysisRuleUtils;
+import org.apache.nifi.flowanalysis.rules.util.ViolationType;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.FormatUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"connection", "expiration", "age"})
+@CapabilityDescription("This rule will generate a violation if FlowFile 
expiration settings of a connection exceed configured thresholds. "
+        + "Improper configuration of FlowFile expiration settings can cause 
files to be deleted unexpectedly and can cause the content "
+        + "repository to fill up.")
+public class RestrictFlowFileExpiration extends AbstractFlowAnalysisRule {
+    public static final PropertyDescriptor ALLOW_ZERO = new 
PropertyDescriptor.Builder()
+            .name("Allow Zero Expiration")
+            .description("If set to true, a 0 second FlowFile Expiration on 
connections is allowed despite other configured restrictions."
+                    + " If set to false, a 0 second FlowFile Expiration will 
be compared against the other configured restrictions."
+                    + " This can be used to prevent a user from setting a 
value of 0 seconds which could fill up"
+                    + " the content repository if files accumulate in front of 
stopped processors.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MIN_FLOWFILE_EXPIRATION = new 
PropertyDescriptor.Builder()
+            .name("Minimum FlowFile Expiration")
+            .description("This is the minimum value that should be set for the 
FlowFile Expiration setting on connections."
+                    + " This can be used to prevent a user from setting a very 
small expiration which can cause files to be"
+                    + " deleted unexpectedly.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("1 min")
+            .build();
+
+    public static final PropertyDescriptor MAX_FLOWFILE_EXPIRATION = new 
PropertyDescriptor.Builder()
+            .name("Maximum FlowFile Expiration")
+            .description("This is the maximum value that should be set for the 
FlowFile Expiration setting on connections."
+                    + " This can be used to prevent a user from setting a 
large expiration which could fill up the content"
+                    + " repository if files accumulate in front of stopped 
processors.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 days")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            ALLOW_ZERO,
+            MIN_FLOWFILE_EXPIRATION,
+            MAX_FLOWFILE_EXPIRATION
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final long minSize = 
validationContext.getProperty(MIN_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maxSize = 
validationContext.getProperty(MAX_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        if (minSize > maxSize) {
+            results.add(
+                    new ValidationResult.Builder()
+                            .subject(MIN_FLOWFILE_EXPIRATION.getName())
+                            .valid(false)
+                            .explanation("Value of '" + 
MIN_FLOWFILE_EXPIRATION.getName() + "' cannot be greater than '" + 
MAX_FLOWFILE_EXPIRATION.getName() + "'")
+                            .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    public Collection<GroupAnalysisResult> 
analyzeProcessGroup(VersionedProcessGroup pg, FlowAnalysisRuleContext context) {
+        final Collection<ConnectionViolation> violations = new ArrayList<>();
+
+        final boolean allowZero = context.getProperty(ALLOW_ZERO).asBoolean();
+        final long minSize = 
context.getProperty(MIN_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maxSize = 
context.getProperty(MAX_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        pg.getConnections().forEach(connection -> {
+            final long connectionExpiration = 
FormatUtils.getTimeDuration(connection.getFlowFileExpiration(), 
TimeUnit.MILLISECONDS);
+
+            if (connectionExpiration != 0 || !allowZero) {
+                if (connectionExpiration < minSize) {
+                    violations.add(new ConnectionViolation(connection,
+                            ExpirationViolationType.EXPIRATION_BELOW_MINIMUM,
+                            this.getClass().getSimpleName(),
+                            connection.getFlowFileExpiration(),
+                            
context.getProperty(MIN_FLOWFILE_EXPIRATION).getValue()));
+                }
+                if (connectionExpiration > maxSize) {
+                    violations.add(new ConnectionViolation(connection,
+                            ExpirationViolationType.EXPIRATION_ABOVE_MAXIMUM,
+                            this.getClass().getSimpleName(),
+                            connection.getFlowFileExpiration(),
+                            
context.getProperty(MAX_FLOWFILE_EXPIRATION).getValue()));
+                }
+            }
+        });
+
+        return FlowAnalysisRuleUtils.convertToGroupAnalysisResults(pg, 
violations);
+    }
+
+    private enum ExpirationViolationType implements ViolationType {
+
+        EXPIRATION_BELOW_MINIMUM("FlowFileExpirationTooLow", "FlowFile 
Expiration", "cannot be less than"),
+        EXPIRATION_ABOVE_MAXIMUM("FlowFileExpirationTooHigh", "FlowFile 
Expiration", "cannot be greater than");
+
+        private final String id;
+        private final String configurationItem;
+        private final String violationMessage;
+
+        ExpirationViolationType(String id, String configurationItem, String 
violationMessage) {
+            this.id = id;
+            this.configurationItem = configurationItem;
+            this.violationMessage = violationMessage;
+        }
+
+        @Override
+        public String getId() {
+            return this.id;
+        }
+
+        @Override
+        public String getConfigurationItem() {
+            return this.configurationItem;
+        }
+
+        @Override
+        public String getViolationMessage() {
+            return this.violationMessage;
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ConnectionViolation.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ConnectionViolation.java
new file mode 100644
index 0000000000..b756802fe8
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ConnectionViolation.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules.util;
+
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.Map;
+
+public class ConnectionViolation {
+    private final VersionedConnection connection;
+    private final ViolationType violationType;
+    private final String rule;
+    private final String configuredValue;
+    private final String ruleLimit;
+
+    public ConnectionViolation(VersionedConnection connection,
+                               ViolationType violationType,
+                               String rule,
+                               String configuredValue,
+                               String ruleLimit) {
+        this.connection = connection;
+        this.violationType = violationType;
+        this.rule = rule;
+        this.configuredValue = configuredValue;
+        this.ruleLimit = ruleLimit;
+    }
+
+    /**
+     * Convert this ConnectionViolation to a Flow Analysis Rule 
GroupAnalysisResult.
+     *
+     * @param components the components in a ProcessGroup that could be 
attached to this VersionedConnection.
+     * @return a GroupAnalysisResult
+     */
+    public GroupAnalysisResult convertToGroupAnalysisResult(Map<String, 
VersionedComponent> components) {
+        VersionedComponent source = 
components.get(connection.getSource().getId());
+        VersionedComponent destination = 
components.get(connection.getDestination().getId());
+        return GroupAnalysisResult.forComponent(
+                        assignConnectionViolationToComponent(connection, 
source, destination),
+                        connection.getIdentifier() + "_" + 
violationType.getId(),
+                        getConnectionLocationMessage(connection, source, 
destination, rule)
+                                + " " + getViolationReason())
+                .build();
+    }
+
+    /*
+     * Connection violations should be assigned to a connected Processor 
(either the source or destination)
+     * because in case the rule is "enforced" we want the corresponding 
component to be invalid.
+     * If neither connected component is a Processor (i.e. funnel, process 
group, port, etc.), then we cannot make
+     * it invalid so put the violation on the connection itself.
+     */
+    private VersionedComponent assignConnectionViolationToComponent(
+            final VersionedConnection connection, final VersionedComponent 
source, final VersionedComponent destination) {
+
+        if (!(source instanceof VersionedProcessor) && !(destination 
instanceof VersionedProcessor)) {
+            // for connections between two components that are not processors 
and cannot be invalid, set violation on connection
+            return connection;
+        } else if (source instanceof VersionedProcessor) {
+            // set violation on source processor
+            return source;
+        } else {
+            // set violation on destination processor
+            return destination;
+        }
+    }
+
+    /*
+     * Format a message stating what configured value violates a rule.
+     */
+    private String getViolationReason() {
+        return String.format("The connection is configured with a %s of %s 
which %s %s.",
+                violationType.getConfigurationItem(), configuredValue, 
violationType.getViolationMessage(), ruleLimit);
+    }
+
+    /*
+     * Format a rule violation message for a VersionedConnection with 
information about the source and destination VersionedComponent.
+     */
+    private String getConnectionLocationMessage(
+            final VersionedConnection connection, final VersionedComponent 
source, final VersionedComponent destination, final String rule) {
+
+        StringBuilder message = new StringBuilder();
+        message.append("The connection ");
+        if (StringUtils.isNotEmpty(connection.getName())) {
+            // Output connection name if it has been named
+            message.append(connection.getName()).append(' ');
+        } else {
+            // Output connection relationships by name
+            String relationships = String.join(",", 
connection.getSelectedRelationships());
+            if (StringUtils.isNotEmpty(relationships)) {
+                message.append(relationships).append(' ');
+            }
+        }
+        message.append("[").append(connection.getIdentifier()).append("]");
+        if (source != null) {
+            String sourceName =
+                    (source instanceof VersionedFunnel) ? "funnel" : 
source.getName();
+            message.append(" from source ").append(sourceName).append(" 
[").append(source.getIdentifier()).append("]");
+        }
+        if (destination != null) {
+            String destinationName =
+                    (destination instanceof VersionedFunnel) ? "funnel" : 
destination.getName();
+            message.append(" to destination 
").append(destinationName).append(" 
[").append(destination.getIdentifier()).append("]");
+        }
+        message.append(" violates the ").append(rule).append(" rule.");
+        return message.toString();
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/FlowAnalysisRuleUtils.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/FlowAnalysisRuleUtils.java
new file mode 100644
index 0000000000..1d9c38eed7
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/FlowAnalysisRuleUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules.util;
+
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class FlowAnalysisRuleUtils {
+    /**
+     * Build a map of all id/components in a ProcessGroup so that we can 
generate more human-readable violations.
+     *
+     * @param group a ProcessGroup
+     * @return a map of VersionedComponents indexed by their id
+     */
+    public static Map<String, VersionedComponent> 
gatherComponents(VersionedProcessGroup group) {
+        return Stream.of(
+                group.getFunnels().stream(),
+                group.getProcessors().stream(),
+                group.getInputPorts().stream(),
+                group.getOutputPorts().stream())
+                .flatMap(c -> c)
+                .collect(Collectors.toMap(VersionedComponent::getIdentifier, 
Function.identity()));
+    }
+
+    /**
+     * Convert ConnectionViolations to GroupAnalysisResults.
+     *
+     * @param pg process group containing the connections
+     * @param violations flow analysis violations
+     * @return a collection of Flow Analysis Rule GroupAnalysisResult
+     */
+    public static Collection<GroupAnalysisResult> 
convertToGroupAnalysisResults(VersionedProcessGroup pg, 
Collection<ConnectionViolation> violations) {
+        if (!violations.isEmpty()) {
+            final Map<String, VersionedComponent> components = 
FlowAnalysisRuleUtils.gatherComponents(pg);
+            final Collection<GroupAnalysisResult> results = new HashSet<>();
+            violations.forEach(violation -> 
results.add(violation.convertToGroupAnalysisResult(components)));
+            return results;
+        } else {
+            return Collections.emptySet();
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ViolationType.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ViolationType.java
new file mode 100644
index 0000000000..4cb3124821
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ViolationType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules.util;
+
+public interface ViolationType {
+    /**
+     * @return a unique identifier for this type of violation
+     */
+    String getId();
+
+    /**
+     * @return the configuration setting that caused the violation
+     */
+    String getConfigurationItem();
+
+    /**
+     * @return text that explains why the configured value is in violation
+     */
+    String getViolationMessage();
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
index df6a6e35f6..f5f1df7eb4 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
@@ -15,3 +15,4 @@
 
 org.apache.nifi.flowanalysis.rules.DisallowComponentType
 org.apache.nifi.flowanalysis.rules.RestrictBackpressureSettings
+org.apache.nifi.flowanalysis.rules.RestrictFlowFileExpiration
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
index d088d85121..976250823a 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.nifi.flowanalysis.rules;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Collection;
 import java.util.List;
 
+import org.apache.nifi.components.ValidationResult;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -44,14 +48,26 @@ public class RestrictBackpressureSettingsTest extends 
AbstractFlowAnalaysisRuleT
     public void testWrongCountConfiguration() {
         setProperty(RestrictBackpressureSettings.COUNT_MIN, "100");
         setProperty(RestrictBackpressureSettings.COUNT_MAX, "10");
-        assertFalse(rule.customValidate(validationContext).isEmpty());
+        Collection<ValidationResult> results = 
rule.customValidate(validationContext);
+        assertEquals(1, results.size());
+        results.forEach(result -> {
+            assertFalse(result.isValid());
+            assertEquals(RestrictBackpressureSettings.COUNT_MIN.getName(), 
result.getSubject());
+            assertTrue(result.getExplanation().contains("cannot be strictly 
greater than"));
+        });
     }
 
     @Test
     public void testWrongSizeConfiguration() {
         setProperty(RestrictBackpressureSettings.SIZE_MIN, "1GB");
         setProperty(RestrictBackpressureSettings.SIZE_MAX, "1MB");
-        assertFalse(rule.customValidate(validationContext).isEmpty());
+        Collection<ValidationResult> results = 
rule.customValidate(validationContext);
+        assertEquals(1, results.size());
+        results.forEach(result -> {
+            assertFalse(result.isValid());
+            assertEquals(RestrictBackpressureSettings.SIZE_MIN.getName(), 
result.getSubject());
+            assertTrue(result.getExplanation().contains("cannot be strictly 
greater than"));
+        });
     }
 
     @Test
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpirationTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpirationTest.java
new file mode 100644
index 0000000000..0b62a8594b
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpirationTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RestrictFlowFileExpirationTest extends 
AbstractFlowAnalaysisRuleTest<RestrictFlowFileExpiration> {
+    @Override
+    protected RestrictFlowFileExpiration initializeRule() {
+        return new RestrictFlowFileExpiration();
+    }
+
+    @BeforeEach
+    @Override
+    public void setup() {
+        super.setup();
+        setProperty(RestrictFlowFileExpiration.ALLOW_ZERO, "true");
+        setProperty(RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION, 
RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION.getDefaultValue());
+        setProperty(RestrictFlowFileExpiration.MAX_FLOWFILE_EXPIRATION, 
RestrictFlowFileExpiration.MAX_FLOWFILE_EXPIRATION.getDefaultValue());
+    }
+
+    @Test
+    public void testBadConfiguration() {
+        setProperty(RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION, "2 
days");
+        setProperty(RestrictFlowFileExpiration.MAX_FLOWFILE_EXPIRATION, "1 
day");
+        Collection<ValidationResult> results = 
rule.customValidate(validationContext);
+        assertEquals(1, results.size());
+        results.forEach(result -> {
+            assertFalse(result.isValid());
+            
assertEquals(RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION.getName(), 
result.getSubject());
+            assertTrue(result.getExplanation().contains("cannot be greater 
than"));
+        });
+    }
+
+    @Test
+    public void testNoViolations() throws Exception {
+        testAnalyzeProcessGroup(
+                
"src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json",
+                List.of()
+        );
+    }
+
+    @Test
+    public void testViolationsAllowZeroTrue() throws Exception {
+        testAnalyzeProcessGroup(
+                
"src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json",
+                List.of(
+                        "e26f3857-0192-1000-776a-3d62e15f75dc", // connection 
from UpdateAttribute to funnel, 1 sec
+                        "e27073f8-0192-1000-cf43-9c41e69eadd2" // connection 
from output port to funnel, 45 days
+                )
+        );
+    }
+
+    @Test
+    public void testViolationsAllowZeroFalse() throws Exception {
+        setProperty(RestrictFlowFileExpiration.ALLOW_ZERO, "false");
+
+        testAnalyzeProcessGroup(
+                
"src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json",
+                List.of(
+                        "e26f20c1-0192-1000-ff8b-bcf395c02076", // 
GenerateFlowFile, GenerateFlowFile connected to UpdateAttribute, 0 sec
+                        "e26f3857-0192-1000-776a-3d62e15f75dc", // 
UpdateAttribute, UpdateAttribute connected to Funnel, 1 sec
+                        "e26f3857-0192-1000-776a-3d62e15f75dc", // 
UpdateAttribute, Funnel connected to UpdateAttribute, 0 sec
+                        "e26fd0d5-0192-1000-ee3d-f90141590475", // connection 
from Funnel to Funnel, 0 sec
+                        "e27073f8-0192-1000-cf43-9c41e69eadd2"  // connection 
from Process Group output port to Funnel, 45 days
+                )
+        );
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json
new file mode 100644
index 0000000000..7c9da29868
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json
@@ -0,0 +1 @@
+{"flowContents":{"identifier":"d846f5a8-343e-3bda-8c49-4413dde11686","instanceIdentifier":"e26effa4-0192-1000-7826-18bfabd139ac","name":"RestrictBackpressureSettings","comments":"","position":{"x":1787.6793181202202,"y":391.0950428985503},"processGroups":[{"identifier":"fb55e92a-7d8e-31b7-b45e-702050f9749d","instanceIdentifier":"e270189e-0192-1000-8aba-e274768ade7e","name":"EmbeddedProcessGroup","comments":"","position":{"x":-408.0,"y":216.0},"processGroups":[],"remoteProcessGroups":[],"
 [...]
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json
new file mode 100644
index 0000000000..1dc178ec8a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json
@@ -0,0 +1 @@
+{"flowContents":{"identifier":"d846f5a8-343e-3bda-8c49-4413dde11686","instanceIdentifier":"e26effa4-0192-1000-7826-18bfabd139ac","name":"RestrictBackpressureSettings","comments":"","position":{"x":1787.6793181202202,"y":391.0950428985503},"processGroups":[{"identifier":"fb55e92a-7d8e-31b7-b45e-702050f9749d","instanceIdentifier":"e270189e-0192-1000-8aba-e274768ade7e","name":"EmbeddedProcessGroup","comments":"","position":{"x":-408.0,"y":216.0},"processGroups":[],"remoteProcessGroups":[],"
 [...]
\ No newline at end of file


Reply via email to