This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c84f40e  NIFI-6111: Fixed bugs in the Status History values. If 
metrics are not available yet from all nodes for the last point on the graph, 
leave the cluster aggregate value off for the last point to prevent it from 
dropping significantly at the end of the chart. Fixed bug where counter values 
were not properly summed together in cluster view. Addressed issue with Average 
Task Duration
c84f40e is described below

commit c84f40ee36c253dfc7166a9af385ea1c10a14757
Author: Mark Payne <[email protected]>
AuthorDate: Fri Mar 8 16:29:03 2019 -0500

    NIFI-6111: Fixed bugs in the Status History values. If metrics are not 
available yet from all nodes for the last point on the graph, leave the cluster 
aggregate value off for the last point to prevent it from dropping 
significantly at the end of the chart. Fixed bug where counter values were not 
properly summed together in cluster view. Addressed issue with Average Task 
Duration
    
    This closes #3361.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../endpoints/StatusHistoryEndpointMerger.java     | 22 +++++++++++++++++-----
 .../status/history/StandardStatusSnapshot.java     |  7 ++++++-
 .../org/apache/nifi/util/ComponentMetrics.java     |  6 ++----
 3 files changed, 25 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
index 8e4c26b..90ef471 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -21,12 +21,12 @@ import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.CounterMetricDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
 import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
 import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
 import 
org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
-import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
 import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
 import org.apache.nifi.controller.status.history.StatusHistoryUtil;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
@@ -157,9 +157,8 @@ public class StatusHistoryEndpointMerger implements 
EndpointResponseMerger {
                         return counters.getOrDefault(descriptorDto.getField(), 
0L);
                     };
 
-                    final MetricDescriptor<ProcessorStatus> metricDescriptor = 
new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(),
-                        descriptorDto.getLabel(), 
descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
-
+                    final MetricDescriptor<ProcessorStatus> metricDescriptor = 
new CounterMetricDescriptor<>(descriptorDto.getField(), 
descriptorDto.getLabel(),
+                        descriptorDto.getDescription(), Formatter.COUNT, 
valueMapper);
                     metricDescriptors.put(fieldName, metricDescriptor);
                 }
             }
@@ -247,11 +246,24 @@ public class StatusHistoryEndpointMerger implements 
EndpointResponseMerger {
         return snapshot;
     }
 
-    private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> 
snapshotsToAggregate) {
+    private List<StatusSnapshotDTO> aggregate(final Map<Date, 
List<StatusSnapshot>> snapshotsToAggregate) {
         // Aggregate the snapshots
         final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new 
ArrayList<>();
+
+        int iteration = 0;
+        int previousSnapshotCount = 0;
         for (final Map.Entry<Date, List<StatusSnapshot>> entry : 
snapshotsToAggregate.entrySet()) {
             final List<StatusSnapshot> snapshots = entry.getValue();
+
+            // If this is the last snapshot, we don't want to include it 
unless we have stats from all nodes.
+            // Otherwise, when we look at the stats in a chart, the last point 
for the cluster stats often seems to
+            // drop off very steeply.
+            if (++iteration == snapshotsToAggregate.size() && snapshots.size() 
< previousSnapshotCount) {
+                continue;
+            }
+
+            previousSnapshotCount = snapshots.size();
+
             final StatusSnapshot reducedSnapshot = 
snapshots.get(0).getValueReducer().reduce(snapshots);
 
             final StatusSnapshotDTO dto = new StatusSnapshotDTO();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
index 8e78c5e..652ffd5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
@@ -78,10 +78,15 @@ public class StandardStatusSnapshot implements 
StatusSnapshot {
 
 
     public void addStatusMetric(final MetricDescriptor<?> metric, final Long 
value) {
+        if (metric.isCounter()) {
+            addCounterStatusMetric(metric, value);
+            return;
+        }
+
         values[metric.getMetricIdentifier()] = value;
     }
 
-    public void addCounterStatusMetric(final MetricDescriptor<?> metric, final 
Long value) {
+    private void addCounterStatusMetric(final MetricDescriptor<?> metric, 
final Long value) {
         if (counterValues == null) {
             counterValues = new HashMap<>();
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
index 7fa2278..5ce1def 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
@@ -73,9 +73,7 @@ public class ComponentMetrics {
         snapshot.setTimestamp(timestamp);
 
         for (final ProcessorStatusDescriptor descriptor : 
ProcessorStatusDescriptor.values()) {
-            if (descriptor.isVisible()) {
-                snapshot.addStatusMetric(descriptor.getDescriptor(), 
descriptor.getDescriptor().getValueFunction().getValue(status));
-            }
+            snapshot.addStatusMetric(descriptor.getDescriptor(), 
descriptor.getDescriptor().getValueFunction().getValue(status));
         }
 
         final Map<String, Long> counters = status.getCounters();
@@ -87,7 +85,7 @@ public class ComponentMetrics {
                 final MetricDescriptor<ProcessorStatus> metricDescriptor = new 
CounterMetricDescriptor<>(entry.getKey(), label, label, 
MetricDescriptor.Formatter.COUNT,
                         s -> s.getCounters() == null ? null : 
s.getCounters().get(counterName));
 
-                snapshot.addCounterStatusMetric(metricDescriptor, 
entry.getValue());
+                snapshot.addStatusMetric(metricDescriptor, entry.getValue());
             }
         }
 

Reply via email to