This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0b748e2ebb NIFI-14713 FlowAnalyzer perf improvements: - Short circuit
getting rule violations when there are none and when we're getting violations
for the root process group by ID or alias. - Short circuit flow analysis if
there are no rule violations configured. - Changes to getRuleViolationStream
algorithm which significantly improve performance.
0b748e2ebb is described below
commit 0b748e2ebb06eeb8f3828bb913af3aa47e0a686a
Author: Eric Secules <[email protected]>
AuthorDate: Fri Jul 11 17:46:28 2025 -0700
NIFI-14713 FlowAnalyzer perf improvements:
- Short circuit getting rule violations when there are none and when we're
getting violations for the root process group by ID or alias.
- Short circuit flow analysis if there are no rule violations configured.
- Changes to getRuleViolationStream algorithm which significantly improve
performance.
This closes #10079.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../nifi/validation/RuleViolationsManager.java | 12 ++
.../nifi/flowanalysis/StandardFlowAnalyzer.java | 3 +
.../validation/StandardRuleViolationsManager.java | 15 +++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 32 +++--
.../nifi/web/StandardNiFiServiceFacadeTest.java | 131 ++++++++++++++++++++-
5 files changed, 184 insertions(+), 9 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/validation/RuleViolationsManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/validation/RuleViolationsManager.java
index 9b7047cc9c..21ceb79047 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/validation/RuleViolationsManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/validation/RuleViolationsManager.java
@@ -58,6 +58,13 @@ public interface RuleViolationsManager {
*/
Collection<RuleViolation> getRuleViolationsForGroup(String groupId);
+ /**
+ * Returns a list of violations for all the given groupIds (non-recursive)
+ *
+ * @return Violations for all the given groupIds
+ */
+ Collection<RuleViolation> getRuleViolationsForGroups(Collection<String>
groupIds);
+
/**
* @return All current rule violations
*/
@@ -81,4 +88,9 @@ public interface RuleViolationsManager {
* Removes empty entries from the map storing the rule violations
*/
void cleanUp();
+
+ /**
+ * @return True if there are no rule violations
+ */
+ boolean isEmpty();
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
index 1b8ca8e566..63ae900d51 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java
@@ -190,6 +190,9 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
Collection<RuleViolation> groupViolations,
Map<VersionedComponent, Collection<RuleViolation>>
componentToRuleViolations
) {
+ if (flowAnalysisRules.isEmpty()) {
+ return;
+ }
String groupId = processGroup.getIdentifier();
ComponentType processGroupComponentType =
processGroup.getComponentType();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/validation/StandardRuleViolationsManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/validation/StandardRuleViolationsManager.java
index ff1a424081..776e5d33ad 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/validation/StandardRuleViolationsManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/validation/StandardRuleViolationsManager.java
@@ -161,6 +161,16 @@ public class StandardRuleViolationsManager implements
RuleViolationsManager {
return groupViolations;
}
+ @Override
+ public Collection<RuleViolation>
getRuleViolationsForGroups(Collection<String> groupIds) {
+ Set<RuleViolation> groupViolations =
subjectIdToRuleViolation.values().stream()
+ .map(Map::values).flatMap(Collection::stream)
+ .filter(violation -> groupIds.contains(violation.getGroupId()))
+ .collect(Collectors.toSet());
+
+ return groupViolations;
+ }
+
@Override
public Collection<RuleViolation> getAllRuleViolations() {
Set<RuleViolation> allRuleViolations =
subjectIdToRuleViolation.values().stream()
@@ -189,4 +199,9 @@ public class StandardRuleViolationsManager implements
RuleViolationsManager {
public void cleanUp() {
subjectIdToRuleViolation.entrySet().removeIf(subjectIdAndViolationMap
-> subjectIdAndViolationMap.getValue().isEmpty());
}
+
+ @Override
+ public boolean isEmpty() {
+ return subjectIdToRuleViolation.isEmpty();
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 684313d038..2874a57aa8 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -6846,18 +6846,34 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
public Stream<RuleViolation> getRuleViolationStream(String processGroupId)
{
+ if (ruleViolationsManager.isEmpty()) {
+ return Stream.empty();
+ }
+
ProcessGroup processGroup =
processGroupDAO.getProcessGroup(processGroupId);
- Collection<RuleViolation> ruleViolations =
ruleViolationsManager.getRuleViolationsForGroup(processGroupId);
+ if
(processGroup.getIdentifier().equals(processGroupDAO.getProcessGroup(FlowManager.ROOT_GROUP_ID_ALIAS).getIdentifier()))
{
+ return ruleViolationsManager.getAllRuleViolations().stream();
+ } else {
- Stream<RuleViolation> ruleViolationStreamForGroupAndAllChildren =
Stream.concat(
- ruleViolations.stream(),
- processGroup.getProcessGroups().stream()
- .map(ProcessGroup::getIdentifier)
- .flatMap(this::getRuleViolationStream)
- );
+ Set<String> allIdsOfProcessGroupAndChildren = new HashSet<>();
+
+ collectGroupIdsRecursively(processGroupId,
allIdsOfProcessGroupAndChildren);
+
+ Collection<RuleViolation> ruleViolations =
ruleViolationsManager.getRuleViolationsForGroups(allIdsOfProcessGroupAndChildren);
- return ruleViolationStreamForGroupAndAllChildren;
+ return ruleViolations.stream();
+ }
+ }
+
+ private void collectGroupIdsRecursively(String processGroupId, Set<String>
allIdsOfProcessGroupAndChildren) {
+ allIdsOfProcessGroupAndChildren.add(processGroupId);
+
+ ProcessGroup processGroup =
processGroupDAO.getProcessGroup(processGroupId);
+ Set<ProcessGroup> children = processGroup.getProcessGroups();
+ for (ProcessGroup child : children) {
+ collectGroupIdsRecursively(child.getIdentifier(),
allIdsOfProcessGroupAndChildren);
+ }
}
public FlowAnalysisResultEntity
createFlowAnalysisResultEntity(Collection<RuleViolation> ruleViolations) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 11a03223d9..2aaf8ecd1b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -1034,9 +1034,59 @@ public class StandardNiFiServiceFacadeTest {
}
+ @Test
+ public void testGetRuleViolationsForRoot() {
+ // GIVEN
+ int ruleViolationCounter = 0;
+
+ String groupId = "groupId";
+ String childGroupId = "childGroupId";
+ String grandChildGroupId = "grandChildGroupId";
+
+ RuleViolation ruleViolation1 = createRuleViolation(groupId,
ruleViolationCounter++);
+ RuleViolation ruleViolation2 = createRuleViolation(groupId,
ruleViolationCounter++);
+
+ RuleViolation childRuleViolation1 = createRuleViolation(childGroupId,
ruleViolationCounter++);
+ RuleViolation childRuleViolation2 = createRuleViolation(childGroupId,
ruleViolationCounter++);
+
+ RuleViolation grandChildRuleViolation1 =
createRuleViolation(grandChildGroupId, ruleViolationCounter++);
+ RuleViolation grandChildRuleViolation2 =
createRuleViolation(grandChildGroupId, ruleViolationCounter++);
+ RuleViolation grandChildRuleViolation3 =
createRuleViolation(grandChildGroupId, ruleViolationCounter++);
+
+ ProcessGroup grandChildProcessGroup = mockProcessGroup(
+ grandChildGroupId,
+ Collections.emptyList(),
+ Arrays.asList(grandChildRuleViolation1,
grandChildRuleViolation2, grandChildRuleViolation3)
+ );
+ ProcessGroup childProcessGroup = mockProcessGroup(
+ childGroupId,
+ Arrays.asList(grandChildProcessGroup),
+ Arrays.asList(childRuleViolation1, childRuleViolation2)
+ );
+ ProcessGroup processGroup = mockProcessGroup(
+ groupId,
+ Arrays.asList(childProcessGroup),
+ Arrays.asList(ruleViolation1, ruleViolation2)
+ );
+
+ Collection<RuleViolation> expected = new HashSet<>(Arrays.asList(
+ ruleViolation1, ruleViolation2,
+ childRuleViolation1, childRuleViolation2,
+ grandChildRuleViolation1, grandChildRuleViolation2,
grandChildRuleViolation3
+ ));
+
+
when(processGroupDAO.getProcessGroup(FlowManager.ROOT_GROUP_ID_ALIAS)).thenReturn(processGroup);
+
when(ruleViolationsManager.getAllRuleViolations()).thenReturn(expected);
+
+ // WHEN
+ Collection<RuleViolation> actual =
serviceFacade.getRuleViolationStream(processGroup.getIdentifier()).collect(Collectors.toSet());
+
+ // THEN
+ assertEquals(expected, actual);
+ }
@Test
- public void testGetRuleViolationsForGroupIsRecursive() throws Exception {
+ public void testGetRuleViolationsForRootWithAlias() {
// GIVEN
int ruleViolationCounter = 0;
@@ -1076,6 +1126,32 @@ public class StandardNiFiServiceFacadeTest {
grandChildRuleViolation1, grandChildRuleViolation2,
grandChildRuleViolation3
));
+
when(processGroupDAO.getProcessGroup(FlowManager.ROOT_GROUP_ID_ALIAS)).thenReturn(processGroup);
+
when(ruleViolationsManager.getAllRuleViolations()).thenReturn(expected);
+
+ // WHEN
+ Collection<RuleViolation> actual =
serviceFacade.getRuleViolationStream(FlowManager.ROOT_GROUP_ID_ALIAS).collect(Collectors.toSet());
+
+ // THEN
+ assertEquals(expected, actual);
+ }
+
+
+ @Test
+ public void testGetRuleViolationsEmpty() {
+ // GIVEN
+ String groupId = "groupId";
+
+ ProcessGroup processGroup = mockProcessGroup(
+ groupId,
+ Arrays.asList(),
+ Arrays.asList()
+ );
+
+ Collection<RuleViolation> expected = new HashSet<>(List.of());
+
+ when(ruleViolationsManager.isEmpty()).thenReturn(true);
+
// WHEN
Collection<RuleViolation> actual =
serviceFacade.getRuleViolationStream(processGroup.getIdentifier()).collect(Collectors.toSet());
@@ -1083,6 +1159,59 @@ public class StandardNiFiServiceFacadeTest {
assertEquals(expected, actual);
}
+ @Test
+ public void testGetRuleViolationsForGroupIsRecursive() {
+ // GIVEN
+ int ruleViolationCounter = 0;
+
+ String rootGroupId = "groupId";
+ String childGroupId = "childGroupId";
+ String grandChildGroupId = "grandChildGroupId";
+
+ RuleViolation ruleViolation1 = createRuleViolation(rootGroupId,
ruleViolationCounter++);
+ RuleViolation ruleViolation2 = createRuleViolation(rootGroupId,
ruleViolationCounter++);
+
+ RuleViolation childRuleViolation1 = createRuleViolation(childGroupId,
ruleViolationCounter++);
+ RuleViolation childRuleViolation2 = createRuleViolation(childGroupId,
ruleViolationCounter++);
+
+ RuleViolation grandChildRuleViolation1 =
createRuleViolation(grandChildGroupId, ruleViolationCounter++);
+ RuleViolation grandChildRuleViolation2 =
createRuleViolation(grandChildGroupId, ruleViolationCounter++);
+ RuleViolation grandChildRuleViolation3 =
createRuleViolation(grandChildGroupId, ruleViolationCounter);
+
+ ProcessGroup grandChildProcessGroup = mockProcessGroup(
+ grandChildGroupId,
+ Collections.emptyList(),
+ Arrays.asList(grandChildRuleViolation1,
grandChildRuleViolation2, grandChildRuleViolation3)
+ );
+ ProcessGroup childProcessGroup = mockProcessGroup(
+ childGroupId,
+ Arrays.asList(grandChildProcessGroup),
+ Arrays.asList(childRuleViolation1, childRuleViolation2)
+ );
+ mockProcessGroup(
+ rootGroupId,
+ Arrays.asList(childProcessGroup),
+ Arrays.asList(ruleViolation1, ruleViolation2)
+ );
+
+
when(ruleViolationsManager.getRuleViolationsForGroups(Set.of(childGroupId,
grandChildGroupId))).thenReturn(
+ Arrays.asList(
+ childRuleViolation1, childRuleViolation2,
+ grandChildRuleViolation1, grandChildRuleViolation2,
grandChildRuleViolation3)
+ );
+
+ Collection<RuleViolation> expected = new HashSet<>(Arrays.asList(
+ childRuleViolation1, childRuleViolation2,
+ grandChildRuleViolation1, grandChildRuleViolation2,
grandChildRuleViolation3
+ ));
+
+ // WHEN
+ Collection<RuleViolation> actual =
serviceFacade.getRuleViolationStream(childProcessGroup.getIdentifier()).collect(Collectors.toSet());
+
+ // THEN
+ assertEquals(expected, actual);
+ }
+
private RuleViolation createRuleViolation(String groupId, int
ruleViolationCounter) {
return new RuleViolation(
EnforcementPolicy.WARN,