This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2ca59d71e6 NIFI-14132 Flow Analysis Rule to check flowfile expiration
configuration
2ca59d71e6 is described below
commit 2ca59d71e6045e5c5bea26cfa865244c9c305758
Author: Mike Moser <[email protected]>
AuthorDate: Thu Jan 9 21:01:20 2025 +0000
NIFI-14132 Flow Analysis Rule to check flowfile expiration configuration
Signed-off-by: Pierre Villard <[email protected]>
This closes #9664.
---
.../nifi-standard-rules/pom.xml | 2 +
.../flowanalysis/rules/DisallowComponentType.java | 17 +--
.../rules/RestrictBackpressureSettings.java | 127 ++++++----------
.../rules/RestrictFlowFileExpiration.java | 168 +++++++++++++++++++++
.../rules/util/ConnectionViolation.java | 125 +++++++++++++++
.../rules/util/FlowAnalysisRuleUtils.java | 65 ++++++++
.../flowanalysis/rules/util/ViolationType.java | 34 +++++
.../org.apache.nifi.flowanalysis.FlowAnalysisRule | 1 +
.../rules/RestrictBackpressureSettingsTest.java | 20 ++-
.../rules/RestrictFlowFileExpirationTest.java | 92 +++++++++++
.../RestrictFlowFileExpiration.json | 1 +
.../RestrictFlowFileExpiration_noViolation.json | 1 +
12 files changed, 555 insertions(+), 98 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
index 1a24deab13..047c5810fb 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
@@ -34,6 +34,8 @@
<excludes combine.children="append">
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json</exclude>
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json</exclude>
+
<exclude>src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json</exclude>
+
<exclude>src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json</exclude>
</excludes>
</configuration>
</plugin>
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
index cacb5a81dd..e562a59ab5 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/DisallowComponentType.java
@@ -26,9 +26,7 @@ import org.apache.nifi.flowanalysis.ComponentAnalysisResult;
import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
import org.apache.nifi.processor.util.StandardValidators;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -44,17 +42,13 @@ public class DisallowComponentType extends
AbstractFlowAnalysisRule {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
- private final static List<PropertyDescriptor> propertyDescriptors;
-
- static {
- List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
- _propertyDescriptors.add(COMPONENT_TYPE);
- propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
- }
+ private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ COMPONENT_TYPE
+ );
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return propertyDescriptors;
+ return PROPERTY_DESCRIPTORS;
}
@Override
@@ -63,8 +57,7 @@ public class DisallowComponentType extends
AbstractFlowAnalysisRule {
String componentType = context.getProperty(COMPONENT_TYPE).getValue();
- if (component instanceof VersionedExtensionComponent) {
- VersionedExtensionComponent versionedExtensionComponent =
(VersionedExtensionComponent) component;
+ if (component instanceof VersionedExtensionComponent
versionedExtensionComponent) {
String encounteredComponentType =
versionedExtensionComponent.getType();
String encounteredSimpleComponentType =
encounteredComponentType.substring(encounteredComponentType.lastIndexOf(".") +
1);
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
index d336cbfdb6..9f8094b69e 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
@@ -21,24 +21,19 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flow.VersionedComponent;
-import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.flowanalysis.rules.util.ConnectionViolation;
+import org.apache.nifi.flowanalysis.rules.util.FlowAnalysisRuleUtils;
+import org.apache.nifi.flowanalysis.rules.util.ViolationType;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
@Tags({"connection", "backpressure"})
@CapabilityDescription("This rule will generate a violation if backpressure
settings of a connection exceed configured thresholds. "
@@ -125,117 +120,81 @@ public class RestrictBackpressureSettings extends
AbstractFlowAnalysisRule {
@Override
public Collection<GroupAnalysisResult>
analyzeProcessGroup(VersionedProcessGroup pg, FlowAnalysisRuleContext context) {
- final Collection<GroupAnalysisResult> results = new HashSet<>();
+ final Collection<ConnectionViolation> violations = new ArrayList<>();
final long minCount = context.getProperty(COUNT_MIN).asLong();
final long maxCount = context.getProperty(COUNT_MAX).asLong();
final double minSize =
context.getProperty(SIZE_MIN).asDataSize(DataUnit.B);
final double maxSize =
context.getProperty(SIZE_MAX).asDataSize(DataUnit.B);
- // Map of all id/components to generate more human readable violations
- final Map<String, VersionedComponent> idComponent = Stream.of(
- pg.getFunnels().stream(),
- pg.getProcessors().stream(),
- pg.getInputPorts().stream(),
- pg.getOutputPorts().stream())
- .flatMap(c -> c)
- .collect(Collectors.toMap(c -> c.getIdentifier(),
Function.identity()));
-
- pg.getConnections().stream().forEach(
+ pg.getConnections().forEach(
connection -> {
if (connection.getBackPressureObjectThreshold() <
minCount) {
- results.add(buildViolation(connection,
-
idComponent.get(connection.getSource().getId()),
-
idComponent.get(connection.getDestination().getId()),
+ violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT,
-
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT,
connection.getBackPressureObjectThreshold().toString(),
Long.toString(minCount))));
+ this.getClass().getSimpleName(),
+
connection.getBackPressureObjectThreshold().toString(),
+ context.getProperty(COUNT_MIN).getValue()));
}
if (connection.getBackPressureObjectThreshold() >
maxCount) {
- results.add(buildViolation(connection,
-
idComponent.get(connection.getSource().getId()),
-
idComponent.get(connection.getDestination().getId()),
+ violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT,
-
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT,
connection.getBackPressureObjectThreshold().toString(),
Long.toString(maxCount))));
+ this.getClass().getSimpleName(),
+
connection.getBackPressureObjectThreshold().toString(),
+ context.getProperty(COUNT_MAX).getValue()));
}
final double sizeThreshold =
DataUnit.parseDataSize(connection.getBackPressureDataSizeThreshold(),
DataUnit.B);
if (Double.compare(sizeThreshold, minSize) < 0) {
- results.add(buildViolation(connection,
-
idComponent.get(connection.getSource().getId()),
-
idComponent.get(connection.getDestination().getId()),
+ violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT,
-
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT,
connection.getBackPressureDataSizeThreshold(),
context.getProperty(SIZE_MIN).getValue())));
+ this.getClass().getSimpleName(),
+ connection.getBackPressureDataSizeThreshold(),
+ context.getProperty(SIZE_MIN).getValue()));
}
if (Double.compare(sizeThreshold, maxSize) > 0) {
- results.add(buildViolation(connection,
-
idComponent.get(connection.getSource().getId()),
-
idComponent.get(connection.getDestination().getId()),
+ violations.add(new ConnectionViolation(connection,
BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT,
-
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT,
connection.getBackPressureDataSizeThreshold(),
context.getProperty(SIZE_MAX).getValue())));
+ this.getClass().getSimpleName(),
+ connection.getBackPressureDataSizeThreshold(),
+ context.getProperty(SIZE_MAX).getValue()));
}
}
);
- return results;
- }
-
- private GroupAnalysisResult buildViolation(final VersionedConnection
connection, final VersionedComponent source,
- final VersionedComponent destination, final
BackpressureViolationType backpressureViolationType, final String
violationMessage) {
- // The reason why we want the violation to be on the processor when we
have one
- // (either as source or destination) is because in case the rule is
"enforced"
- // we want the corresponding component to be invalid. If this is not a
processor
- // (funnel, process group, etc) we cannot make it invalid so we put the
- // violation on the connection itself.
- if (!(source instanceof VersionedProcessor) && !(destination
instanceof VersionedProcessor)) {
- // connection between two components that are not processors and
cannot be invalid, setting violation on connection
- return GroupAnalysisResult.forComponent(connection,
- connection.getIdentifier() + "_" +
backpressureViolationType.getId(),
- getLocationMessage(connection, source, destination) +
violationMessage).build();
- } else if (source instanceof VersionedProcessor) {
- // defining violation on source processor
- return GroupAnalysisResult.forComponent(source,
- connection.getIdentifier() + "_" +
backpressureViolationType.getId(),
- getLocationMessage(connection, source, destination) +
violationMessage).build();
- } else {
- // defining violation on destination processor
- return GroupAnalysisResult.forComponent(destination,
- connection.getIdentifier() + "_" +
backpressureViolationType.getId(),
- getLocationMessage(connection, source, destination) +
violationMessage).build();
- }
- }
-
- private String getLocationMessage(final VersionedConnection connection,
final VersionedComponent source, final VersionedComponent destination) {
- if (source == null || destination == null) {
- return "The connection [" + connection.getIdentifier() + "] is
violating the rule for backpressure settings. ";
- }
- return "The connection [" + connection.getIdentifier() + "] connecting
" + source.getName() + " [" + source.getIdentifier() + "] to "
- + destination.getName() + " [" + destination.getIdentifier() +
"] is violating the rule for backpressure settings. ";
- }
-
- private String getViolationMessage(final BackpressureViolationType
backpressureViolationType, final String configured, final String limit) {
- return switch (backpressureViolationType) {
- case BP_COUNT_THRESHOLD_ABOVE_LIMIT -> "The connection is configured
with a Backpressure Count Threshold of " + configured + " and it should be
lesser or equal than " + limit + ".";
- case BP_COUNT_THRESHOLD_BELOW_LIMIT -> "The connection is configured
with a Backpressure Count Threshold of " + configured + " and it should be
greater or equal than " + limit + ".";
- case BP_SIZE_THRESHOLD_ABOVE_LIMIT -> "The connection is configured
with a Backpressure Data Size Threshold of " + configured + " and it should be
lesser or equal than " + limit + ".";
- case BP_SIZE_THRESHOLD_BELOW_LIMIT -> "The connection is configured
with a Backpressure Data Size Threshold of " + configured + " and it should be
greater or equal than " + limit + ".";
- };
+ return FlowAnalysisRuleUtils.convertToGroupAnalysisResults(pg,
violations);
}
- private enum BackpressureViolationType {
+ private enum BackpressureViolationType implements ViolationType {
- BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow"),
- BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh"),
- BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow"),
- BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh");
+ BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow",
"Back Pressure Count Threshold", "cannot be less than"),
+ BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh",
"Back Pressure Count Threshold", "cannot be greater than"),
+ BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow", "Back
Pressure Data Size Threshold", "cannot be less than"),
+ BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh",
"Back Pressure Data Size Threshold", "cannot be greater than");
private final String id;
+ private final String configurationItem;
+ private final String violationMessage;
- BackpressureViolationType(String id) {
+ BackpressureViolationType(String id, String configurationItem, String
violationMessage) {
this.id = id;
+ this.configurationItem = configurationItem;
+ this.violationMessage = violationMessage;
}
+ @Override
public String getId() {
return this.id;
}
+ @Override
+ public String getConfigurationItem() {
+ return this.configurationItem;
+ }
+
+ @Override
+ public String getViolationMessage() {
+ return this.violationMessage;
+ }
+
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpiration.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpiration.java
new file mode 100644
index 0000000000..41046a25b5
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpiration.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
+import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.flowanalysis.rules.util.ConnectionViolation;
+import org.apache.nifi.flowanalysis.rules.util.FlowAnalysisRuleUtils;
+import org.apache.nifi.flowanalysis.rules.util.ViolationType;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.FormatUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"connection", "expiration", "age"})
+@CapabilityDescription("This rule will generate a violation if FlowFile
expiration settings of a connection exceed configured thresholds. "
+ + "Improper configuration of FlowFile expiration settings can cause
files to be deleted unexpectedly and can cause the content "
+ + "repository to fill up.")
+public class RestrictFlowFileExpiration extends AbstractFlowAnalysisRule {
+ public static final PropertyDescriptor ALLOW_ZERO = new
PropertyDescriptor.Builder()
+ .name("Allow Zero Expiration")
+ .description("If set to true, a 0 second FlowFile Expiration on
connections is allowed despite other configured restrictions."
+ + " If set to false, a 0 second FlowFile Expiration will
be compared against the other configured restrictions."
+ + " This can be used to prevent a user from setting a
value of 0 seconds which could fill up"
+ + " the content repository if files accumulate in front of
stopped processors.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MIN_FLOWFILE_EXPIRATION = new
PropertyDescriptor.Builder()
+ .name("Minimum FlowFile Expiration")
+ .description("This is the minimum value that should be set for the
FlowFile Expiration setting on connections."
+ + " This can be used to prevent a user from setting a very
small expiration which can cause files to be"
+ + " deleted unexpectedly.")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("1 min")
+ .build();
+
+ public static final PropertyDescriptor MAX_FLOWFILE_EXPIRATION = new
PropertyDescriptor.Builder()
+ .name("Maximum FlowFile Expiration")
+ .description("This is the maximum value that should be set for the
FlowFile Expiration setting on connections."
+ + " This can be used to prevent a user from setting a
large expiration which could fill up the content"
+ + " repository if files accumulate in front of stopped
processors.")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 days")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ ALLOW_ZERO,
+ MIN_FLOWFILE_EXPIRATION,
+ MAX_FLOWFILE_EXPIRATION
+ );
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final long minSize =
validationContext.getProperty(MIN_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long maxSize =
validationContext.getProperty(MAX_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+
+ if (minSize > maxSize) {
+ results.add(
+ new ValidationResult.Builder()
+ .subject(MIN_FLOWFILE_EXPIRATION.getName())
+ .valid(false)
+ .explanation("Value of '" +
MIN_FLOWFILE_EXPIRATION.getName() + "' cannot be greater than '" +
MAX_FLOWFILE_EXPIRATION.getName() + "'")
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ public Collection<GroupAnalysisResult>
analyzeProcessGroup(VersionedProcessGroup pg, FlowAnalysisRuleContext context) {
+ final Collection<ConnectionViolation> violations = new ArrayList<>();
+
+ final boolean allowZero = context.getProperty(ALLOW_ZERO).asBoolean();
+ final long minSize =
context.getProperty(MIN_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long maxSize =
context.getProperty(MAX_FLOWFILE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS);
+
+ pg.getConnections().forEach(connection -> {
+ final long connectionExpiration =
FormatUtils.getTimeDuration(connection.getFlowFileExpiration(),
TimeUnit.MILLISECONDS);
+
+ if (connectionExpiration != 0 || !allowZero) {
+ if (connectionExpiration < minSize) {
+ violations.add(new ConnectionViolation(connection,
+ ExpirationViolationType.EXPIRATION_BELOW_MINIMUM,
+ this.getClass().getSimpleName(),
+ connection.getFlowFileExpiration(),
+
context.getProperty(MIN_FLOWFILE_EXPIRATION).getValue()));
+ }
+ if (connectionExpiration > maxSize) {
+ violations.add(new ConnectionViolation(connection,
+ ExpirationViolationType.EXPIRATION_ABOVE_MAXIMUM,
+ this.getClass().getSimpleName(),
+ connection.getFlowFileExpiration(),
+
context.getProperty(MAX_FLOWFILE_EXPIRATION).getValue()));
+ }
+ }
+ });
+
+ return FlowAnalysisRuleUtils.convertToGroupAnalysisResults(pg,
violations);
+ }
+
+ private enum ExpirationViolationType implements ViolationType {
+
+ EXPIRATION_BELOW_MINIMUM("FlowFileExpirationTooLow", "FlowFile
Expiration", "cannot be less than"),
+ EXPIRATION_ABOVE_MAXIMUM("FlowFileExpirationTooHigh", "FlowFile
Expiration", "cannot be greater than");
+
+ private final String id;
+ private final String configurationItem;
+ private final String violationMessage;
+
+ ExpirationViolationType(String id, String configurationItem, String
violationMessage) {
+ this.id = id;
+ this.configurationItem = configurationItem;
+ this.violationMessage = violationMessage;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public String getConfigurationItem() {
+ return this.configurationItem;
+ }
+
+ @Override
+ public String getViolationMessage() {
+ return this.violationMessage;
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ConnectionViolation.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ConnectionViolation.java
new file mode 100644
index 0000000000..b756802fe8
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ConnectionViolation.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules.util;
+
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.Map;
+
+public class ConnectionViolation {
+ private final VersionedConnection connection;
+ private final ViolationType violationType;
+ private final String rule;
+ private final String configuredValue;
+ private final String ruleLimit;
+
+ public ConnectionViolation(VersionedConnection connection,
+ ViolationType violationType,
+ String rule,
+ String configuredValue,
+ String ruleLimit) {
+ this.connection = connection;
+ this.violationType = violationType;
+ this.rule = rule;
+ this.configuredValue = configuredValue;
+ this.ruleLimit = ruleLimit;
+ }
+
+ /**
+ * Convert this ConnectionViolation to a Flow Analysis Rule
GroupAnalysisResult.
+ *
+ * @param components the components in a ProcessGroup that could be
attached to this VersionedConnection.
+ * @return a GroupAnalysisResult
+ */
+ public GroupAnalysisResult convertToGroupAnalysisResult(Map<String,
VersionedComponent> components) {
+ VersionedComponent source =
components.get(connection.getSource().getId());
+ VersionedComponent destination =
components.get(connection.getDestination().getId());
+ return GroupAnalysisResult.forComponent(
+ assignConnectionViolationToComponent(connection,
source, destination),
+ connection.getIdentifier() + "_" +
violationType.getId(),
+ getConnectionLocationMessage(connection, source,
destination, rule)
+ + " " + getViolationReason())
+ .build();
+ }
+
+ /*
+ * Connection violations should be assigned to a connected Processor
(either the source or destination)
+ * because in case the rule is "enforced" we want the corresponding
component to be invalid.
+ * If neither connected component is a Processor (i.e. funnel, process
group, port, etc.), then we cannot make
+ * it invalid so put the violation on the connection itself.
+ */
+ private VersionedComponent assignConnectionViolationToComponent(
+ final VersionedConnection connection, final VersionedComponent
source, final VersionedComponent destination) {
+
+ if (!(source instanceof VersionedProcessor) && !(destination
instanceof VersionedProcessor)) {
+ // for connections between two components that are not processors
and cannot be invalid, set violation on connection
+ return connection;
+ } else if (source instanceof VersionedProcessor) {
+ // set violation on source processor
+ return source;
+ } else {
+ // set violation on destination processor
+ return destination;
+ }
+ }
+
+ /*
+ * Format a message stating what configured value violates a rule.
+ */
+ private String getViolationReason() {
+ return String.format("The connection is configured with a %s of %s
which %s %s.",
+ violationType.getConfigurationItem(), configuredValue,
violationType.getViolationMessage(), ruleLimit);
+ }
+
+ /*
+ * Format a rule violation message for a VersionedConnection with
information about the source and destination VersionedComponent.
+ */
+ private String getConnectionLocationMessage(
+ final VersionedConnection connection, final VersionedComponent
source, final VersionedComponent destination, final String rule) {
+
+ StringBuilder message = new StringBuilder();
+ message.append("The connection ");
+ if (StringUtils.isNotEmpty(connection.getName())) {
+ // Output connection name if it has been named
+ message.append(connection.getName()).append(' ');
+ } else {
+ // Output connection relationships by name
+ String relationships = String.join(",",
connection.getSelectedRelationships());
+ if (StringUtils.isNotEmpty(relationships)) {
+ message.append(relationships).append(' ');
+ }
+ }
+ message.append("[").append(connection.getIdentifier()).append("]");
+ if (source != null) {
+ String sourceName =
+ (source instanceof VersionedFunnel) ? "funnel" :
source.getName();
+ message.append(" from source ").append(sourceName).append("
[").append(source.getIdentifier()).append("]");
+ }
+ if (destination != null) {
+ String destinationName =
+ (destination instanceof VersionedFunnel) ? "funnel" :
destination.getName();
+ message.append(" to destination
").append(destinationName).append("
[").append(destination.getIdentifier()).append("]");
+ }
+ message.append(" violates the ").append(rule).append(" rule.");
+ return message.toString();
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/FlowAnalysisRuleUtils.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/FlowAnalysisRuleUtils.java
new file mode 100644
index 0000000000..1d9c38eed7
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/FlowAnalysisRuleUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules.util;
+
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class FlowAnalysisRuleUtils {
+ /**
+ * Build a map of all id/components in a ProcessGroup so that we can
generate more human-readable violations.
+ *
+ * @param group a ProcessGroup
+ * @return a map of VersionedComponents indexed by their id
+ */
+ public static Map<String, VersionedComponent>
gatherComponents(VersionedProcessGroup group) {
+ return Stream.of(
+ group.getFunnels().stream(),
+ group.getProcessors().stream(),
+ group.getInputPorts().stream(),
+ group.getOutputPorts().stream())
+ .flatMap(c -> c)
+ .collect(Collectors.toMap(VersionedComponent::getIdentifier,
Function.identity()));
+ }
+
+ /**
+ * Convert ConnectionViolations to GroupAnalysisResults.
+ *
+ * @param pg process group containing the connections
+ * @param violations flow analysis violations
+ * @return a collection of Flow Analysis Rule GroupAnalysisResult
+ */
+ public static Collection<GroupAnalysisResult>
convertToGroupAnalysisResults(VersionedProcessGroup pg,
Collection<ConnectionViolation> violations) {
+ if (!violations.isEmpty()) {
+ final Map<String, VersionedComponent> components =
FlowAnalysisRuleUtils.gatherComponents(pg);
+ final Collection<GroupAnalysisResult> results = new HashSet<>();
+ violations.forEach(violation ->
results.add(violation.convertToGroupAnalysisResult(components)));
+ return results;
+ } else {
+ return Collections.emptySet();
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ViolationType.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ViolationType.java
new file mode 100644
index 0000000000..4cb3124821
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/util/ViolationType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules.util;
+
+public interface ViolationType {
+ /**
+ * @return a unique identifier for this type of violation
+ */
+ String getId();
+
+ /**
+ * @return the configuration setting that caused the violation
+ */
+ String getConfigurationItem();
+
+ /**
+ * @return text that explains why the configured value is in violation
+ */
+ String getViolationMessage();
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
index df6a6e35f6..f5f1df7eb4 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
@@ -15,3 +15,4 @@
org.apache.nifi.flowanalysis.rules.DisallowComponentType
org.apache.nifi.flowanalysis.rules.RestrictBackpressureSettings
+org.apache.nifi.flowanalysis.rules.RestrictFlowFileExpiration
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
index d088d85121..976250823a 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
@@ -16,10 +16,14 @@
*/
package org.apache.nifi.flowanalysis.rules;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Collection;
import java.util.List;
+import org.apache.nifi.components.ValidationResult;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -44,14 +48,26 @@ public class RestrictBackpressureSettingsTest extends
AbstractFlowAnalaysisRuleT
public void testWrongCountConfiguration() {
setProperty(RestrictBackpressureSettings.COUNT_MIN, "100");
setProperty(RestrictBackpressureSettings.COUNT_MAX, "10");
- assertFalse(rule.customValidate(validationContext).isEmpty());
+ Collection<ValidationResult> results =
rule.customValidate(validationContext);
+ assertEquals(1, results.size());
+ results.forEach(result -> {
+ assertFalse(result.isValid());
+ assertEquals(RestrictBackpressureSettings.COUNT_MIN.getName(),
result.getSubject());
+ assertTrue(result.getExplanation().contains("cannot be strictly
greater than"));
+ });
}
@Test
public void testWrongSizeConfiguration() {
setProperty(RestrictBackpressureSettings.SIZE_MIN, "1GB");
setProperty(RestrictBackpressureSettings.SIZE_MAX, "1MB");
- assertFalse(rule.customValidate(validationContext).isEmpty());
+ Collection<ValidationResult> results =
rule.customValidate(validationContext);
+ assertEquals(1, results.size());
+ results.forEach(result -> {
+ assertFalse(result.isValid());
+ assertEquals(RestrictBackpressureSettings.SIZE_MIN.getName(),
result.getSubject());
+ assertTrue(result.getExplanation().contains("cannot be strictly
greater than"));
+ });
}
@Test
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpirationTest.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpirationTest.java
new file mode 100644
index 0000000000..0b62a8594b
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictFlowFileExpirationTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules;
+
+import org.apache.nifi.components.ValidationResult;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RestrictFlowFileExpirationTest extends
AbstractFlowAnalaysisRuleTest<RestrictFlowFileExpiration> {
+ @Override
+ protected RestrictFlowFileExpiration initializeRule() {
+ return new RestrictFlowFileExpiration();
+ }
+
+ @BeforeEach
+ @Override
+ public void setup() {
+ super.setup();
+ setProperty(RestrictFlowFileExpiration.ALLOW_ZERO, "true");
+ setProperty(RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION,
RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION.getDefaultValue());
+ setProperty(RestrictFlowFileExpiration.MAX_FLOWFILE_EXPIRATION,
RestrictFlowFileExpiration.MAX_FLOWFILE_EXPIRATION.getDefaultValue());
+ }
+
+ @Test
+ public void testBadConfiguration() {
+ setProperty(RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION, "2
days");
+ setProperty(RestrictFlowFileExpiration.MAX_FLOWFILE_EXPIRATION, "1
day");
+ Collection<ValidationResult> results =
rule.customValidate(validationContext);
+ assertEquals(1, results.size());
+ results.forEach(result -> {
+ assertFalse(result.isValid());
+
assertEquals(RestrictFlowFileExpiration.MIN_FLOWFILE_EXPIRATION.getName(),
result.getSubject());
+ assertTrue(result.getExplanation().contains("cannot be greater
than"));
+ });
+ }
+
+ @Test
+ public void testNoViolations() throws Exception {
+ testAnalyzeProcessGroup(
+
"src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json",
+ List.of()
+ );
+ }
+
+ @Test
+ public void testViolationsAllowZeroTrue() throws Exception {
+ testAnalyzeProcessGroup(
+
"src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json",
+ List.of(
+ "e26f3857-0192-1000-776a-3d62e15f75dc", // connection
from UpdateAttribute to funnel, 1 sec
+ "e27073f8-0192-1000-cf43-9c41e69eadd2" // connection
from output port to funnel, 45 days
+ )
+ );
+ }
+
+ @Test
+ public void testViolationsAllowZeroFalse() throws Exception {
+ setProperty(RestrictFlowFileExpiration.ALLOW_ZERO, "false");
+
+ testAnalyzeProcessGroup(
+
"src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json",
+ List.of(
+ "e26f20c1-0192-1000-ff8b-bcf395c02076", //
GenerateFlowFile, GenerateFlowFile connected to UpdateAttribute, 0 sec
+ "e26f3857-0192-1000-776a-3d62e15f75dc", //
UpdateAttribute, UpdateAttribute connected to Funnel, 1 sec
+ "e26f3857-0192-1000-776a-3d62e15f75dc", //
UpdateAttribute, Funnel connected to UpdateAttribute, 0 sec
+ "e26fd0d5-0192-1000-ee3d-f90141590475", // connection
from Funnel to Funnel, 0 sec
+ "e27073f8-0192-1000-cf43-9c41e69eadd2" // connection
from Process Group output port to Funnel, 45 days
+ )
+ );
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json
new file mode 100644
index 0000000000..7c9da29868
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration.json
@@ -0,0 +1 @@
+{"flowContents":{"identifier":"d846f5a8-343e-3bda-8c49-4413dde11686","instanceIdentifier":"e26effa4-0192-1000-7826-18bfabd139ac","name":"RestrictBackpressureSettings","comments":"","position":{"x":1787.6793181202202,"y":391.0950428985503},"processGroups":[{"identifier":"fb55e92a-7d8e-31b7-b45e-702050f9749d","instanceIdentifier":"e270189e-0192-1000-8aba-e274768ade7e","name":"EmbeddedProcessGroup","comments":"","position":{"x":-408.0,"y":216.0},"processGroups":[],"remoteProcessGroups":[],"
[...]
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json
new file mode 100644
index 0000000000..1dc178ec8a
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictFlowFileExpiration/RestrictFlowFileExpiration_noViolation.json
@@ -0,0 +1 @@
+{"flowContents":{"identifier":"d846f5a8-343e-3bda-8c49-4413dde11686","instanceIdentifier":"e26effa4-0192-1000-7826-18bfabd139ac","name":"RestrictBackpressureSettings","comments":"","position":{"x":1787.6793181202202,"y":391.0950428985503},"processGroups":[{"identifier":"fb55e92a-7d8e-31b7-b45e-702050f9749d","instanceIdentifier":"e270189e-0192-1000-8aba-e274768ade7e","name":"EmbeddedProcessGroup","comments":"","position":{"x":-408.0,"y":216.0},"processGroups":[],"remoteProcessGroups":[],"
[...]
\ No newline at end of file