This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c84f40e NIFI-6111: Fixed bugs in the Status History values. If
metrics are not available yet from all nodes for the last point on the graph,
leave the cluster aggregate value off for the last point to prevent it from
dropping significantly at the end of the chart. Fixed bug where counter values
were not properly summed together in cluster view. Addressed issue with Average
Task Duration
c84f40e is described below
commit c84f40ee36c253dfc7166a9af385ea1c10a14757
Author: Mark Payne <[email protected]>
AuthorDate: Fri Mar 8 16:29:03 2019 -0500
NIFI-6111: Fixed bugs in the Status History values. If metrics are not
available yet from all nodes for the last point on the graph, leave the cluster
aggregate value off for the last point to prevent it from dropping
significantly at the end of the chart. Fixed bug where counter values were not
properly summed together in cluster view. Addressed issue with Average Task
Duration
This closes #3361.
Signed-off-by: Bryan Bende <[email protected]>
---
.../endpoints/StatusHistoryEndpointMerger.java | 22 +++++++++++++++++-----
.../status/history/StandardStatusSnapshot.java | 7 ++++++-
.../org/apache/nifi/util/ComponentMetrics.java | 6 ++----
3 files changed, 25 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
index 8e4c26b..90ef471 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java
@@ -21,12 +21,12 @@ import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.CounterMetricDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
import
org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
-import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.status.history.StatusSnapshot;
@@ -157,9 +157,8 @@ public class StatusHistoryEndpointMerger implements
EndpointResponseMerger {
return counters.getOrDefault(descriptorDto.getField(),
0L);
};
- final MetricDescriptor<ProcessorStatus> metricDescriptor =
new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(),
- descriptorDto.getLabel(),
descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
-
+ final MetricDescriptor<ProcessorStatus> metricDescriptor =
new CounterMetricDescriptor<>(descriptorDto.getField(),
descriptorDto.getLabel(),
+ descriptorDto.getDescription(), Formatter.COUNT,
valueMapper);
metricDescriptors.put(fieldName, metricDescriptor);
}
}
@@ -247,11 +246,24 @@ public class StatusHistoryEndpointMerger implements
EndpointResponseMerger {
return snapshot;
}
- private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>>
snapshotsToAggregate) {
+ private List<StatusSnapshotDTO> aggregate(final Map<Date,
List<StatusSnapshot>> snapshotsToAggregate) {
// Aggregate the snapshots
final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new
ArrayList<>();
+
+ int iteration = 0;
+ int previousSnapshotCount = 0;
for (final Map.Entry<Date, List<StatusSnapshot>> entry :
snapshotsToAggregate.entrySet()) {
final List<StatusSnapshot> snapshots = entry.getValue();
+
+ // If this is the last snapshot, we don't want to include it
unless we have stats from all nodes.
+ // Otherwise, when we look at the stats in a chart, the last point
for the cluster stats often seems to
+ // drop off very steeply.
+ if (++iteration == snapshotsToAggregate.size() && snapshots.size()
< previousSnapshotCount) {
+ continue;
+ }
+
+ previousSnapshotCount = snapshots.size();
+
final StatusSnapshot reducedSnapshot =
snapshots.get(0).getValueReducer().reduce(snapshots);
final StatusSnapshotDTO dto = new StatusSnapshotDTO();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
index 8e78c5e..652ffd5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
@@ -78,10 +78,15 @@ public class StandardStatusSnapshot implements
StatusSnapshot {
public void addStatusMetric(final MetricDescriptor<?> metric, final Long
value) {
+ if (metric.isCounter()) {
+ addCounterStatusMetric(metric, value);
+ return;
+ }
+
values[metric.getMetricIdentifier()] = value;
}
- public void addCounterStatusMetric(final MetricDescriptor<?> metric, final
Long value) {
+ private void addCounterStatusMetric(final MetricDescriptor<?> metric,
final Long value) {
if (counterValues == null) {
counterValues = new HashMap<>();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
index 7fa2278..5ce1def 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java
@@ -73,9 +73,7 @@ public class ComponentMetrics {
snapshot.setTimestamp(timestamp);
for (final ProcessorStatusDescriptor descriptor :
ProcessorStatusDescriptor.values()) {
- if (descriptor.isVisible()) {
- snapshot.addStatusMetric(descriptor.getDescriptor(),
descriptor.getDescriptor().getValueFunction().getValue(status));
- }
+ snapshot.addStatusMetric(descriptor.getDescriptor(),
descriptor.getDescriptor().getValueFunction().getValue(status));
}
final Map<String, Long> counters = status.getCounters();
@@ -87,7 +85,7 @@ public class ComponentMetrics {
final MetricDescriptor<ProcessorStatus> metricDescriptor = new
CounterMetricDescriptor<>(entry.getKey(), label, label,
MetricDescriptor.Formatter.COUNT,
s -> s.getCounters() == null ? null :
s.getCounters().get(counterName));
- snapshot.addCounterStatusMetric(metricDescriptor,
entry.getValue());
+ snapshot.addStatusMetric(metricDescriptor, entry.getValue());
}
}