This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.9.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ca32e045109f97b3a7e2cd5a810a003ca1f0148b Author: Mark Payne <[email protected]> AuthorDate: Fri Mar 8 16:29:03 2019 -0500 NIFI-6111: Fixed bugs in the Status History values. If metrics are not available yet from all nodes for the last point on the graph, leave the cluster aggregate value off for the last point to prevent it from dropping significantly at the end of the chart. Fixed bug where counter values were not properly summed together in cluster view. Addressed issue with Average Task Duration This closes #3361. Signed-off-by: Bryan Bende <[email protected]> --- .../endpoints/StatusHistoryEndpointMerger.java | 22 +++++++++++++++++----- .../status/history/StandardStatusSnapshot.java | 7 ++++++- .../org/apache/nifi/util/ComponentMetrics.java | 6 ++---- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java index 8e4c26b..90ef471 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -21,12 +21,12 @@ import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.CounterMetricDescriptor; import org.apache.nifi.controller.status.history.MetricDescriptor; import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; -import org.apache.nifi.controller.status.history.StandardMetricDescriptor; import org.apache.nifi.controller.status.history.StandardStatusSnapshot; import org.apache.nifi.controller.status.history.StatusHistoryUtil; import org.apache.nifi.controller.status.history.StatusSnapshot; @@ -157,9 +157,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { return counters.getOrDefault(descriptorDto.getField(), 0L); }; - final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(), - descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper); - + final MetricDescriptor<ProcessorStatus> metricDescriptor = new CounterMetricDescriptor<>(descriptorDto.getField(), descriptorDto.getLabel(), + descriptorDto.getDescription(), Formatter.COUNT, valueMapper); metricDescriptors.put(fieldName, metricDescriptor); } } @@ -247,11 +246,24 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { return snapshot; } - private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) { + private List<StatusSnapshotDTO> aggregate(final Map<Date, List<StatusSnapshot>> snapshotsToAggregate) { // Aggregate the snapshots final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new ArrayList<>(); + + int iteration = 0; + int previousSnapshotCount = 0; for (final Map.Entry<Date, List<StatusSnapshot>> entry : snapshotsToAggregate.entrySet()) { final List<StatusSnapshot> snapshots = entry.getValue(); + + // If this is the last snapshot, we don't want to include it unless we have stats from all nodes. + // Otherwise, when we look at the stats in a chart, the last point for the cluster stats often seems to + // drop off very steeply. + if (++iteration == snapshotsToAggregate.size() && snapshots.size() < previousSnapshotCount) { + continue; + } + + previousSnapshotCount = snapshots.size(); + final StatusSnapshot reducedSnapshot = snapshots.get(0).getValueReducer().reduce(snapshots); final StatusSnapshotDTO dto = new StatusSnapshotDTO(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java index 8e78c5e..652ffd5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java @@ -78,10 +78,15 @@ public class StandardStatusSnapshot implements StatusSnapshot { public void addStatusMetric(final MetricDescriptor<?> metric, final Long value) { + if (metric.isCounter()) { + addCounterStatusMetric(metric, value); + return; + } + values[metric.getMetricIdentifier()] = value; } - public void addCounterStatusMetric(final MetricDescriptor<?> metric, final Long value) { + private void addCounterStatusMetric(final MetricDescriptor<?> metric, final Long value) { if (counterValues == null) { counterValues = new HashMap<>(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java index 7fa2278..5ce1def 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java @@ -73,9 +73,7 @@ public class ComponentMetrics { snapshot.setTimestamp(timestamp); for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { - if (descriptor.isVisible()) { - snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); - } + snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); } final Map<String, Long> counters = status.getCounters(); @@ -87,7 +85,7 @@ public class ComponentMetrics { final MetricDescriptor<ProcessorStatus> metricDescriptor = new CounterMetricDescriptor<>(entry.getKey(), label, label, MetricDescriptor.Formatter.COUNT, s -> s.getCounters() == null ? null : s.getCounters().get(counterName)); - snapshot.addCounterStatusMetric(metricDescriptor, entry.getValue()); + snapshot.addStatusMetric(metricDescriptor, entry.getValue()); } }
