This is an automated email from the ASF dual-hosted git repository.

mosermw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 10ca3355c6 NIFI-13952 - Flow Analysis Rule to restrict backpressure 
configuration (#9476)
10ca3355c6 is described below

commit 10ca3355c6f7d5eaf53d8ae76725aba67091a48c
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Oct 31 13:55:18 2024 +0100

    NIFI-13952 - Flow Analysis Rule to restrict backpressure configuration 
(#9476)
    
    Signed-off-by: Mike Moser <[email protected]>
---
 .../nifi-standard-rules/pom.xml                    |  21 +-
 .../rules/RestrictBackpressureSettings.java        | 241 +++++++++++++++++++++
 .../org.apache.nifi.flowanalysis.FlowAnalysisRule  |   3 +-
 .../rules/AbstractFlowAnalaysisRuleTest.java       |  90 ++++++++
 .../rules/RestrictBackpressureSettingsTest.java    |  80 +++++++
 .../RestrictBackpressureSettings.json              |   1 +
 .../RestrictBackpressureSettings_noViolation.json  |   1 +
 7 files changed, 432 insertions(+), 5 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
index f9bfa725ed..4d77a1c044 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/pom.xml
@@ -23,7 +23,20 @@
     <artifactId>nifi-standard-rules</artifactId>
     <packaging>jar</packaging>
 
-    <dependencies>
-
-    </dependencies>
-</project>
\ No newline at end of file
+    <dependencies />
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json</exclude>
+                        
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
new file mode 100644
index 0000000000..d336cbfdb6
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettings.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
+import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Tags({"connection", "backpressure"})
+@CapabilityDescription("This rule will generate a violation if backpressure 
settings of a connection exceed configured thresholds. "
+        + "Improper configuration of backpressure settings can lead to 
decreased performance because of excessive swapping and can "
+        + "fill up the content repository with too much in-flight data.")
+public class RestrictBackpressureSettings extends AbstractFlowAnalysisRule {
+
+    public static final PropertyDescriptor COUNT_MIN = new 
PropertyDescriptor.Builder()
+            .name("Minimum Backpressure Object Count Threshold")
+            .description("This is the minimum value that should be set for the 
Object Count backpressure setting on connections. "
+                    + "This can be used to prevent a user from setting a value 
of 0 which disables backpressure based on count.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .defaultValue("1")
+            .build();
+
+    public static final PropertyDescriptor COUNT_MAX = new 
PropertyDescriptor.Builder()
+            .name("Maximum Backpressure Object Count Threshold")
+            .description("This is the maximum value that should be set for the 
Object Count backpressure setting on connections. "
+                    + "This can be used to prevent a user from setting a very 
high value that may be leading to a lot of swapping.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .defaultValue("10000")
+            .build();
+
+    public static final PropertyDescriptor SIZE_MIN = new 
PropertyDescriptor.Builder()
+            .name("Minimum Backpressure Data Size Threshold")
+            .description("This is the minimum value that should be set for the 
Data Size backpressure setting on connections. "
+                    + "This can be used to prevent a user from setting a value 
of 0 which disables backpressure based on size.")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
+    public static final PropertyDescriptor SIZE_MAX = new 
PropertyDescriptor.Builder()
+            .name("Maximum Backpressure Data Size Threshold")
+            .description("This is the maximum value that should be set for the 
Data Size backpressure setting on connections. "
+                    + "This can be used to prevent a user from setting a very 
high value that may be filling up the content repo.")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 GB")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = List.of(
+            COUNT_MIN,
+            COUNT_MAX,
+            SIZE_MIN,
+            SIZE_MAX
+    );
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final long minCount = 
validationContext.getProperty(COUNT_MIN).asLong();
+        final long maxCount = 
validationContext.getProperty(COUNT_MAX).asLong();
+        final double minSize = 
validationContext.getProperty(SIZE_MIN).asDataSize(DataUnit.B);
+        final double maxSize = 
validationContext.getProperty(SIZE_MAX).asDataSize(DataUnit.B);
+
+        if (minCount > maxCount) {
+            results.add(
+                    new ValidationResult.Builder()
+                    .subject(COUNT_MIN.getName())
+                    .valid(false)
+                    .explanation("Value of '" + COUNT_MIN.getName() + "' 
cannot be strictly greater than '" + COUNT_MAX.getName() + "'")
+                    .build());
+        }
+        if (Double.compare(minSize, maxSize) > 0) {
+            results.add(
+                    new ValidationResult.Builder()
+                    .subject(SIZE_MIN.getName())
+                    .valid(false)
+                    .explanation("Value of '" + SIZE_MIN.getName() + "' cannot 
be strictly greater than '" + SIZE_MAX.getName() + "'")
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Collection<GroupAnalysisResult> 
analyzeProcessGroup(VersionedProcessGroup pg, FlowAnalysisRuleContext context) {
+        final Collection<GroupAnalysisResult> results = new HashSet<>();
+
+        final long minCount = context.getProperty(COUNT_MIN).asLong();
+        final long maxCount = context.getProperty(COUNT_MAX).asLong();
+        final double minSize = 
context.getProperty(SIZE_MIN).asDataSize(DataUnit.B);
+        final double maxSize = 
context.getProperty(SIZE_MAX).asDataSize(DataUnit.B);
+
+        // Map of all id/components to generate more human readable violations
+        final Map<String, VersionedComponent> idComponent = Stream.of(
+                pg.getFunnels().stream(),
+                pg.getProcessors().stream(),
+                pg.getInputPorts().stream(),
+                pg.getOutputPorts().stream())
+                .flatMap(c -> c)
+                .collect(Collectors.toMap(c -> c.getIdentifier(), 
Function.identity()));
+
+        pg.getConnections().stream().forEach(
+                connection -> {
+                    if (connection.getBackPressureObjectThreshold() < 
minCount) {
+                        results.add(buildViolation(connection,
+                                
idComponent.get(connection.getSource().getId()),
+                                
idComponent.get(connection.getDestination().getId()),
+                                
BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT,
+                                
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT, 
connection.getBackPressureObjectThreshold().toString(), 
Long.toString(minCount))));
+                    }
+                    if (connection.getBackPressureObjectThreshold() > 
maxCount) {
+                        results.add(buildViolation(connection,
+                                
idComponent.get(connection.getSource().getId()),
+                                
idComponent.get(connection.getDestination().getId()),
+                                
BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT,
+                                
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT, 
connection.getBackPressureObjectThreshold().toString(), 
Long.toString(maxCount))));
+                    }
+                    final double sizeThreshold = 
DataUnit.parseDataSize(connection.getBackPressureDataSizeThreshold(), 
DataUnit.B);
+                    if (Double.compare(sizeThreshold, minSize) < 0) {
+                        results.add(buildViolation(connection,
+                                
idComponent.get(connection.getSource().getId()),
+                                
idComponent.get(connection.getDestination().getId()),
+                                
BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT,
+                                
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT, 
connection.getBackPressureDataSizeThreshold(), 
context.getProperty(SIZE_MIN).getValue())));
+                    }
+                    if (Double.compare(sizeThreshold, maxSize) > 0) {
+                        results.add(buildViolation(connection,
+                                
idComponent.get(connection.getSource().getId()),
+                                
idComponent.get(connection.getDestination().getId()),
+                                
BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT,
+                                
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT, 
connection.getBackPressureDataSizeThreshold(), 
context.getProperty(SIZE_MAX).getValue())));
+                    }
+                }
+            );
+
+        return results;
+    }
+
+    private GroupAnalysisResult buildViolation(final VersionedConnection 
connection, final VersionedComponent source,
+            final VersionedComponent destination, final 
BackpressureViolationType backpressureViolationType, final String 
violationMessage) {
+        // The reason why we want the violation to be on the processor when we 
have one
+        // (either as source or destination) is because in case the rule is 
"enforced"
+        // we want the corresponding component to be invalid. If this is not a 
processor
+        // (funnel, process group, etc) we cannot make it invalid so we put the
+        // violation on the connection itself.
+        if (!(source instanceof VersionedProcessor) && !(destination 
instanceof VersionedProcessor)) {
+            // connection between two components that are not processors and 
cannot be invalid, setting violation on connection
+            return GroupAnalysisResult.forComponent(connection,
+                    connection.getIdentifier() + "_" + 
backpressureViolationType.getId(),
+                    getLocationMessage(connection, source, destination) + 
violationMessage).build();
+        } else if (source instanceof VersionedProcessor) {
+            // defining violation on source processor
+            return GroupAnalysisResult.forComponent(source,
+                    connection.getIdentifier() + "_" + 
backpressureViolationType.getId(),
+                    getLocationMessage(connection, source, destination) + 
violationMessage).build();
+        } else {
+            // defining violation on destination processor
+            return GroupAnalysisResult.forComponent(destination,
+                    connection.getIdentifier() + "_" + 
backpressureViolationType.getId(),
+                    getLocationMessage(connection, source, destination) + 
violationMessage).build();
+        }
+    }
+
+    private String getLocationMessage(final VersionedConnection connection, 
final VersionedComponent source, final VersionedComponent destination) {
+        if (source == null || destination == null) {
+            return "The connection [" + connection.getIdentifier() + "] is 
violating the rule for backpressure settings. ";
+        }
+        return "The connection [" + connection.getIdentifier() + "] connecting 
" + source.getName() + " [" + source.getIdentifier() + "] to "
+                + destination.getName() + " [" + destination.getIdentifier() + 
"] is violating the rule for backpressure settings. ";
+    }
+
+    private String getViolationMessage(final BackpressureViolationType 
backpressureViolationType, final String configured, final String limit) {
+        return switch (backpressureViolationType) {
+        case BP_COUNT_THRESHOLD_ABOVE_LIMIT -> "The connection is configured 
with a Backpressure Count Threshold of " + configured + " and it should be 
lesser or equal than " + limit + ".";
+        case BP_COUNT_THRESHOLD_BELOW_LIMIT -> "The connection is configured 
with a Backpressure Count Threshold of " + configured + " and it should be 
greater or equal than " + limit + ".";
+        case BP_SIZE_THRESHOLD_ABOVE_LIMIT -> "The connection is configured 
with a Backpressure Data Size Threshold of " + configured + " and it should be 
lesser or equal than " + limit + ".";
+        case BP_SIZE_THRESHOLD_BELOW_LIMIT -> "The connection is configured 
with a Backpressure Data Size Threshold of " + configured + " and it should be 
greater or equal than " + limit + ".";
+        };
+    }
+
+    private enum BackpressureViolationType {
+
+        BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow"),
+        BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh"),
+        BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow"),
+        BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh");
+
+        private final String id;
+
+        BackpressureViolationType(String id) {
+            this.id = id;
+        }
+
+        public String getId() {
+            return this.id;
+        }
+
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
index 14d82e4560..df6a6e35f6 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/main/resources/META-INF/services/org.apache.nifi.flowanalysis.FlowAnalysisRule
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.nifi.flowanalysis.rules.DisallowComponentType
\ No newline at end of file
+org.apache.nifi.flowanalysis.rules.DisallowComponentType
+org.apache.nifi.flowanalysis.rules.RestrictBackpressureSettings
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/AbstractFlowAnalaysisRuleTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/AbstractFlowAnalaysisRuleTest.java
new file mode 100644
index 0000000000..04b2d38acb
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/AbstractFlowAnalaysisRuleTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
+import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
+import org.apache.nifi.flowanalysis.GroupAnalysisResult;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.FileInputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.mockito.ArgumentMatchers.any;
+
+@ExtendWith(MockitoExtension.class)
+public abstract class AbstractFlowAnalaysisRuleTest<T extends 
AbstractFlowAnalysisRule> {
+
+    private static final ObjectMapper FLOW_MAPPER = new ObjectMapper();
+    protected Map<PropertyDescriptor, PropertyValue> properties = new 
HashMap<>();
+    protected T rule;
+
+    @Mock
+    protected FlowAnalysisRuleContext flowAnalysisRuleContext;
+
+    @Mock
+    protected ConfigurationContext configurationContext;
+
+    @Mock
+    protected ValidationContext validationContext;
+
+    protected abstract T initializeRule();
+
+    @BeforeEach
+    public void setup() {
+        rule = initializeRule();
+        
Mockito.lenient().when(flowAnalysisRuleContext.getProperty(any())).thenAnswer(invocation
 -> {
+            return properties.get(invocation.getArgument(0));
+        });
+        
Mockito.lenient().when(configurationContext.getProperty(any())).thenAnswer(invocation
 -> {
+            return properties.get(invocation.getArgument(0));
+        });
+        
Mockito.lenient().when(validationContext.getProperty(any())).thenAnswer(invocation
 -> {
+            return properties.get(invocation.getArgument(0));
+        });
+    }
+
+    protected void setProperty(PropertyDescriptor propertyDescriptor, String 
value) {
+        properties.put(propertyDescriptor, new StandardPropertyValue(value, 
null, null));
+    }
+
+    private VersionedProcessGroup getProcessGroup(String flowDefinition) 
throws Exception {
+        final RegisteredFlowSnapshot flowSnapshot = FLOW_MAPPER.readValue(new 
FileInputStream(flowDefinition), RegisteredFlowSnapshot.class);
+        return flowSnapshot.getFlowContents();
+    }
+
+    protected void testAnalyzeProcessGroup(String flowDefinition, List<String> 
expected) throws Exception {
+        final Collection<GroupAnalysisResult> actual = 
rule.analyzeProcessGroup(getProcessGroup(flowDefinition), 
flowAnalysisRuleContext);
+        assertIterableEquals(expected, actual.stream().map(r -> 
r.getComponent().get().getInstanceIdentifier()).sorted().toList());
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
new file mode 100644
index 0000000000..d088d85121
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/java/org/apache/nifi/flowanalysis/rules/RestrictBackpressureSettingsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis.rules;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class RestrictBackpressureSettingsTest extends 
AbstractFlowAnalaysisRuleTest<RestrictBackpressureSettings> {
+
+    @Override
+    protected RestrictBackpressureSettings initializeRule() {
+        return new RestrictBackpressureSettings();
+    }
+
+    @BeforeEach
+    @Override
+    public void setup() {
+        super.setup();
+        setProperty(RestrictBackpressureSettings.COUNT_MIN, 
RestrictBackpressureSettings.COUNT_MIN.getDefaultValue());
+        setProperty(RestrictBackpressureSettings.COUNT_MAX, 
RestrictBackpressureSettings.COUNT_MAX.getDefaultValue());
+        setProperty(RestrictBackpressureSettings.SIZE_MIN, 
RestrictBackpressureSettings.SIZE_MIN.getDefaultValue());
+        setProperty(RestrictBackpressureSettings.SIZE_MAX, 
RestrictBackpressureSettings.SIZE_MAX.getDefaultValue());
+    }
+
+    @Test
+    public void testWrongCountConfiguration() {
+        setProperty(RestrictBackpressureSettings.COUNT_MIN, "100");
+        setProperty(RestrictBackpressureSettings.COUNT_MAX, "10");
+        assertFalse(rule.customValidate(validationContext).isEmpty());
+    }
+
+    @Test
+    public void testWrongSizeConfiguration() {
+        setProperty(RestrictBackpressureSettings.SIZE_MIN, "1GB");
+        setProperty(RestrictBackpressureSettings.SIZE_MAX, "1MB");
+        assertFalse(rule.customValidate(validationContext).isEmpty());
+    }
+
+    @Test
+    public void testNoViolations() throws Exception {
+        testAnalyzeProcessGroup(
+                
"src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json",
+                List.of()
+            );
+    }
+
+    @Test
+    public void testViolations() throws Exception {
+        testAnalyzeProcessGroup(
+                
"src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json",
+                List.of(
+                        "e26f20c1-0192-1000-ff8b-bcf395c02076",  // processor 
GenerateFlowFile connecting to UpdateAttribute
+                        "e26f3857-0192-1000-776a-3d62e15f75dc", // processor 
UpdateAttribute connecting to funnel
+                        "e26f3857-0192-1000-776a-3d62e15f75dc", // processor 
UpdateAttribute connecting from funnel
+                        "e26fd0d5-0192-1000-ee3d-f90141590475", // connection 
from funnel to funnel
+                        "e27073f8-0192-1000-cf43-9c41e69eadd2", // connection 
from output port to funnel
+                        "e270eaa4-0192-1000-0622-8f9af5319328" // connection 
from funnel to input port
+                    )
+            );
+    }
+
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json
new file mode 100644
index 0000000000..15403f601c
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json
@@ -0,0 +1 @@
+{"flowContents":{"identifier":"d846f5a8-343e-3bda-8c49-4413dde11686","instanceIdentifier":"e26effa4-0192-1000-7826-18bfabd139ac","name":"RestrictBackpressureSettings","comments":"","position":{"x":1787.6793181202202,"y":391.0950428985503},"processGroups":[{"identifier":"fb55e92a-7d8e-31b7-b45e-702050f9749d","instanceIdentifier":"e270189e-0192-1000-8aba-e274768ade7e","name":"EmbeddedProcessGroup","comments":"","position":{"x":-408.0,"y":216.0},"processGroups":[],"remoteProcessGroups":[],"
 [...]
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json
new file mode 100644
index 0000000000..1dc178ec8a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-rules/src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json
@@ -0,0 +1 @@
+{"flowContents":{"identifier":"d846f5a8-343e-3bda-8c49-4413dde11686","instanceIdentifier":"e26effa4-0192-1000-7826-18bfabd139ac","name":"RestrictBackpressureSettings","comments":"","position":{"x":1787.6793181202202,"y":391.0950428985503},"processGroups":[{"identifier":"fb55e92a-7d8e-31b7-b45e-702050f9749d","instanceIdentifier":"e270189e-0192-1000-8aba-e274768ade7e","name":"EmbeddedProcessGroup","comments":"","position":{"x":-408.0,"y":216.0},"processGroups":[],"remoteProcessGroups":[],"
 [...]
\ No newline at end of file

Reply via email to