This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8bb2e0790a9 NIFI-15761: Consolidate verification messages across nodes
(#11062)
8bb2e0790a9 is described below
commit 8bb2e0790a9decd3f87afed491beef50b16a276d
Author: Bob Paulin <[email protected]>
AuthorDate: Sun Mar 29 04:19:49 2026 -0500
NIFI-15761: Consolidate verification messages across nodes (#11062)
* If messages are the same across nodes we should consolidate.
* If messages differ include node information so the user can determine
which node is failing
---
.../endpoints/ConfigVerificationResultMerger.java | 66 +++----
.../ConfigVerificationResultMergerTest.java | 198 +++++++++++++++++++++
2 files changed, 234 insertions(+), 30 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConfigVerificationResultMerger.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConfigVerificationResultMerger.java
index e7a11a47d4f..254c47d7a72 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConfigVerificationResultMerger.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConfigVerificationResultMerger.java
@@ -30,6 +30,8 @@ import java.util.Map;
public class ConfigVerificationResultMerger {
private final Map<String, List<ConfigVerificationResultDTO>>
verificationResultDtos = new HashMap<>();
+ private record NodeResult(String nodeId, ConfigVerificationResultDTO
result) { }
+
/**
* Adds the config verification results for one of the nodes in the cluster
* @param nodeId the ID of the node in the cluster
@@ -44,12 +46,16 @@ public class ConfigVerificationResultMerger {
}
/**
- * Computes the aggregate list of ConfigVerificationResultDTO based on all
of the results added using the {link {@link #addNodeResults(NodeIdentifier,
List)}} method
+ * Computes the aggregate list of ConfigVerificationResultDTO based on all
of the results added using the {link {@link #addNodeResults(NodeIdentifier,
List)}} method.
+ *
+ * <p>Node address information is only included in the explanation when
nodes produce different outcomes or explanations for the same
+ * verification step. When all nodes agree (including single-node
deployments), the explanation is returned without any node prefix.</p>
+ *
* @return the aggregate results of the config verification results from
all nodes
*/
public List<ConfigVerificationResultDTO> computeAggregateResults() {
// For each node, build up a mapping of Step Name -> Results
- final Map<String, List<ConfigVerificationResultDTO>> resultsByStepName
= new HashMap<>();
+ final Map<String, List<NodeResult>> resultsByStepName = new
HashMap<>();
for (final Map.Entry<String, List<ConfigVerificationResultDTO>> entry
: verificationResultDtos.entrySet()) {
final String nodeId = entry.getKey();
final List<ConfigVerificationResultDTO> nodeResults =
entry.getValue();
@@ -60,45 +66,45 @@ public class ConfigVerificationResultMerger {
}
for (final ConfigVerificationResultDTO result : nodeResults) {
- final String stepName = result.getVerificationStepName();
- final List<ConfigVerificationResultDTO> resultList =
resultsByStepName.computeIfAbsent(stepName, key -> new ArrayList<>());
-
- // If skipped or unsuccessful, add the node's address to the
explanation
- if (!Outcome.SUCCESSFUL.name().equals(result.getOutcome())) {
- result.setExplanation(nodeId + " - " +
result.getExplanation());
- }
-
- resultList.add(result);
+
resultsByStepName.computeIfAbsent(result.getVerificationStepName(), key -> new
ArrayList<>())
+ .add(new NodeResult(nodeId, result));
}
}
// Merge together all results for each step name
final List<ConfigVerificationResultDTO> aggregateResults = new
ArrayList<>();
- for (final Map.Entry<String, List<ConfigVerificationResultDTO>> entry
: resultsByStepName.entrySet()) {
+ for (final Map.Entry<String, List<NodeResult>> entry :
resultsByStepName.entrySet()) {
final String stepName = entry.getKey();
- final List<ConfigVerificationResultDTO> resultList =
entry.getValue();
-
- final ConfigVerificationResultDTO firstResult = resultList.get(0);
// This is safe because the list won't be added to the map unless it has at
least 1 element.
- String outcome = firstResult.getOutcome();
- String explanation = firstResult.getExplanation();
-
- for (final ConfigVerificationResultDTO result : resultList) {
- // If any node indicates failure, the outcome is failure.
- // Otherwise, if any node indicates that a step was skipped,
the outcome is skipped.
- // Otherwise, all nodes have reported the outcome is
successful, so the outcome is successful.
- if (Outcome.FAILED.name().equals(result.getOutcome())) {
- outcome = result.getOutcome();
- explanation = result.getExplanation();
- } else if (Outcome.SKIPPED.name().equals(result.getOutcome())
&& Outcome.SUCCESSFUL.name().equals(outcome)) {
- outcome = result.getOutcome();
- explanation = result.getExplanation();
+ final List<NodeResult> stepResults = entry.getValue();
+
+ // Select the worst outcome: FAILED > SKIPPED > SUCCESSFUL
+ NodeResult selected = stepResults.get(0);
+ for (final NodeResult nodeResult : stepResults) {
+ if
(Outcome.FAILED.name().equals(nodeResult.result().getOutcome())) {
+ selected = nodeResult;
+ } else if
(Outcome.SKIPPED.name().equals(nodeResult.result().getOutcome())
+ &&
Outcome.SUCCESSFUL.name().equals(selected.result().getOutcome())) {
+ selected = nodeResult;
}
}
+ // Only include node prefix when results differ across nodes
+ final long distinctResultCount = stepResults.stream()
+ .map(nodeResult -> nodeResult.result().getOutcome() + " -- " +
nodeResult.result().getExplanation())
+ .distinct()
+ .count();
+
+ final String aggregateExplanation;
+ if (distinctResultCount > 1 &&
!Outcome.SUCCESSFUL.name().equals(selected.result().getOutcome())) {
+ aggregateExplanation = selected.nodeId() + " - " +
selected.result().getExplanation();
+ } else {
+ aggregateExplanation = selected.result().getExplanation();
+ }
+
final ConfigVerificationResultDTO resultDto = new
ConfigVerificationResultDTO();
resultDto.setVerificationStepName(stepName);
- resultDto.setOutcome(outcome);
- resultDto.setExplanation(explanation);
+ resultDto.setOutcome(selected.result().getOutcome());
+ resultDto.setExplanation(aggregateExplanation);
aggregateResults.add(resultDto);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/ConfigVerificationResultMergerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/ConfigVerificationResultMergerTest.java
new file mode 100644
index 00000000000..52059d566d2
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/ConfigVerificationResultMergerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ConfigVerificationResultMergerTest {
+
+ private static final NodeIdentifier NODE_1 = new NodeIdentifier("node-1",
"host-1", 8080, "host-1", 19998, null, null, null, false);
+ private static final NodeIdentifier NODE_2 = new NodeIdentifier("node-2",
"host-2", 8081, "host-2", 19999, null, null, null, false);
+
+ @Test
+ void testSingleNodeSuccessful() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.SUCCESSFUL, "All good")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals(Outcome.SUCCESSFUL.name(), results.get(0).getOutcome());
+ assertEquals("All good", results.get(0).getExplanation());
+ }
+
+ @Test
+ void testSingleNodeFailed() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.FAILED, "Connection refused")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals(Outcome.FAILED.name(), results.get(0).getOutcome());
+ assertEquals("Connection refused", results.get(0).getExplanation());
+ }
+
+ @Test
+ void testMultiNodeAllAgreeOnFailure() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.FAILED, "Connection refused")));
+ merger.addNodeResults(NODE_2, List.of(createResult("Step 1",
Outcome.FAILED, "Connection refused")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals(Outcome.FAILED.name(), results.get(0).getOutcome());
+ assertEquals("Connection refused", results.get(0).getExplanation());
+ }
+
+ @Test
+ void testMultiNodeAllAgreeOnSuccess() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.SUCCESSFUL, "Connected")));
+ merger.addNodeResults(NODE_2, List.of(createResult("Step 1",
Outcome.SUCCESSFUL, "Connected")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals(Outcome.SUCCESSFUL.name(), results.get(0).getOutcome());
+ assertEquals("Connected", results.get(0).getExplanation());
+ }
+
+ @Test
+ void testMultiNodeDisagreeOnOutcome() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.SUCCESSFUL, "Connected")));
+ merger.addNodeResults(NODE_2, List.of(createResult("Step 1",
Outcome.FAILED, "Connection refused")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals(Outcome.FAILED.name(), results.get(0).getOutcome());
+ assertEquals("host-2:8081 - Connection refused",
results.get(0).getExplanation());
+ }
+
+ @Test
+ void testMultiNodeDisagreeOnExplanation() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.FAILED, "Timeout after 5s")));
+ merger.addNodeResults(NODE_2, List.of(createResult("Step 1",
Outcome.FAILED, "Connection refused")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals(Outcome.FAILED.name(), results.get(0).getOutcome());
+ final String explanation = results.get(0).getExplanation();
+ assertTrue(explanation.contains(" - "), "Node prefix should be
included when explanations differ");
+ assertTrue(explanation.startsWith("host-"), "Explanation should start
with node address prefix");
+ }
+
+ @Test
+ void testMultiNodeSkippedOverridesSuccess() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.SUCCESSFUL, "OK")));
+ merger.addNodeResults(NODE_2, List.of(createResult("Step 1",
Outcome.SKIPPED, "Skipped due to config")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals(Outcome.SKIPPED.name(), results.get(0).getOutcome());
+ assertEquals("host-2:8081 - Skipped due to config",
results.get(0).getExplanation());
+ }
+
+ @Test
+ void testNullResultsFilteredByAddNodeResults() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(createResult("Step 1",
Outcome.SUCCESSFUL, "OK")));
+ merger.addNodeResults(NODE_2, null);
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals("OK", results.get(0).getExplanation());
+ }
+
+ @Test
+ void testInputDtosNotMutated() {
+ final ConfigVerificationResultDTO node1Result = createResult("Step 1",
Outcome.FAILED, "Connection refused");
+ final ConfigVerificationResultDTO node2Result = createResult("Step 1",
Outcome.FAILED, "Connection refused");
+
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(node1Result));
+ merger.addNodeResults(NODE_2, List.of(node2Result));
+
+ merger.computeAggregateResults();
+
+ assertEquals("Connection refused", node1Result.getExplanation());
+ assertEquals("Connection refused", node2Result.getExplanation());
+ }
+
+ @Test
+ void testMultipleStepsPreserveOrder() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of(
+ createResult("Step A", Outcome.SUCCESSFUL, "OK"),
+ createResult("Step B", Outcome.FAILED, "Bad config"),
+ createResult("Step C", Outcome.SKIPPED, "Skipped")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(3, results.size());
+ assertEquals("Step A", results.get(0).getVerificationStepName());
+ assertEquals("Step B", results.get(1).getVerificationStepName());
+ assertEquals("Step C", results.get(2).getVerificationStepName());
+ }
+
+ @Test
+ void testEmptyResultsIgnored() {
+ final ConfigVerificationResultMerger merger = new
ConfigVerificationResultMerger();
+ merger.addNodeResults(NODE_1, List.of());
+ merger.addNodeResults(NODE_2, List.of(createResult("Step 1",
Outcome.SUCCESSFUL, "OK")));
+
+ final List<ConfigVerificationResultDTO> results =
merger.computeAggregateResults();
+
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals("OK", results.get(0).getExplanation());
+ }
+
+ private static ConfigVerificationResultDTO createResult(final String
stepName, final Outcome outcome, final String explanation) {
+ final ConfigVerificationResultDTO dto = new
ConfigVerificationResultDTO();
+ dto.setVerificationStepName(stepName);
+ dto.setOutcome(outcome.name());
+ dto.setExplanation(explanation);
+ return dto;
+ }
+}