Repository: incubator-nifi Updated Branches: refs/heads/develop a53cc3d70 -> 8bfc96914
NIFI-534: - Getting component metrics from Nodes on a set interval using their most recent heartbeat in order to ensure there is a data point for each node for each window. - Updating the snapshot reduce method to use the timestamp from the first snapshot. This is then the timestamp for the aggregate snapshot. Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f5479428 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f5479428 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f5479428 Branch: refs/heads/develop Commit: f5479428e5040aacedee0f75d72ccf15f8169042 Parents: a53cc3d Author: Matt Gilman <[email protected]> Authored: Thu Apr 23 10:13:05 2015 -0400 Committer: Matt Gilman <[email protected]> Committed: Thu Apr 23 10:13:05 2015 -0400 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 51 +++++++++++++------- .../status/history/StandardStatusSnapshot.java | 8 +++ 2 files changed, 42 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f5479428/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 94ea17f..9a8b014 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -398,6 +398,37 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); } componentStatusSnapshotMillis = snapshotMillis; + + Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + readLock.lock(); + try { + for (final Node node : nodes) { + if (Status.CONNECTED.equals(node.getStatus())) { + ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); + if (statusRepository == null) { + statusRepository = createComponentStatusRepository(); + componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository); + } + + // ensure this node has a payload + if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) { + // if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp + // is safe since its marked as XmlTransient so we're assured that its based off the same clock that created the last capture date + if (statusRepository.getLastCaptureDate() == null || node.getHeartbeat().getCreatedTimestamp() > statusRepository.getLastCaptureDate().getTime()) { + statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus()); + } + } + } + } + } catch(final Throwable t) { + logger.warn("Unable to capture component metrics from Node heartbeats: " + t); + } finally { + readLock.unlock("capture component metrics from node heartbeats"); + } + } + }, componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS); remoteInputPort = properties.getRemoteInputPort(); if (remoteInputPort == null) { @@ -1807,20 +1838,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // record heartbeat node.setHeartbeat(mostRecentHeartbeat); - - ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - statusRepository = createComponentStatusRepository(); - componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository); - } - - // If it's been a while since we've captured, capture this metric. - final Date lastCaptureDate = statusRepository.getLastCaptureDate(); - final long millisSinceLastCapture = (lastCaptureDate == null) ? Long.MAX_VALUE : (System.currentTimeMillis() - lastCaptureDate.getTime()); - - if (millisSinceLastCapture > componentStatusSnapshotMillis) { - statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus()); - } } } catch (final Exception e) { logger.error("Failed to process heartbeat from {}:{} due to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString()); @@ -3877,11 +3894,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // Aggregate the snapshots final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new ArrayList<>(); for (final Map.Entry<Date, List<StatusSnapshot>> entry : snapshotsToAggregate.entrySet()) { - final StatusSnapshotDTO dto = new StatusSnapshotDTO(); - dto.setTimestamp(entry.getKey()); - final List<StatusSnapshot> snapshots = entry.getValue(); final StatusSnapshot reducedSnapshot = snapshots.get(0).getValueReducer().reduce(snapshots); + + final StatusSnapshotDTO dto = new StatusSnapshotDTO(); + dto.setTimestamp(reducedSnapshot.getTimestamp()); dto.setStatusMetrics(StatusHistoryUtil.createStatusSnapshotDto(reducedSnapshot).getStatusMetrics()); aggregatedSnapshotDtos.add(dto); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f5479428/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java index 69bbbf7..e1fdca8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java @@ -51,12 +51,20 @@ public class StandardStatusSnapshot implements StatusSnapshot { return new ValueReducer<StatusSnapshot, StatusSnapshot>() { @Override public StatusSnapshot reduce(final List<StatusSnapshot> values) { + Date reducedTimestamp = null; final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>(metricValues.keySet()); + for (final StatusSnapshot statusSnapshot : values) { + if (reducedTimestamp == null) { + reducedTimestamp = statusSnapshot.getTimestamp(); + } allDescriptors.addAll(statusSnapshot.getStatusMetrics().keySet()); } final StandardStatusSnapshot reduced = new StandardStatusSnapshot(); + if (reducedTimestamp != null) { + reduced.setTimestamp(reducedTimestamp); + } for (final MetricDescriptor<?> descriptor : allDescriptors) { final Long descriptorValue = descriptor.getValueReducer().reduce(values);
