Repository: incubator-nifi
Updated Branches:
  refs/heads/develop a53cc3d70 -> 8bfc96914


NIFI-534:
- Getting component metrics from Nodes on a set interval using their most 
recent heartbeat in order to ensure there is a data point for each node for 
each window.
- Updating the snapshot reduce method to use the timestamp from the first 
snapshot. This is then the timestamp for the aggregate snapshot.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f5479428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f5479428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f5479428

Branch: refs/heads/develop
Commit: f5479428e5040aacedee0f75d72ccf15f8169042
Parents: a53cc3d
Author: Matt Gilman <[email protected]>
Authored: Thu Apr 23 10:13:05 2015 -0400
Committer: Matt Gilman <[email protected]>
Committed: Thu Apr 23 10:13:05 2015 -0400

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 51 +++++++++++++-------
 .../status/history/StandardStatusSnapshot.java  |  8 +++
 2 files changed, 42 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f5479428/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 94ea17f..9a8b014 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -398,6 +398,37 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             snapshotMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
 TimeUnit.MILLISECONDS);
         }
         componentStatusSnapshotMillis = snapshotMillis;
+        
+        
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new 
Runnable() {
+            @Override
+            public void run() {
+                readLock.lock();
+                try {
+                    for (final Node node : nodes) {
+                        if (Status.CONNECTED.equals(node.getStatus())) {
+                            ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
+                            if (statusRepository == null) {
+                                statusRepository = 
createComponentStatusRepository();
+                                
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
+                            }
+                            
+                            // ensure this node has a payload
+                            if (node.getHeartbeat() != null && 
node.getHeartbeatPayload() != null) {
+                                // if nothing has been captured or the current 
heartbeat is newer, capture it - comparing the heatbeat created timestamp
+                                // is safe since its marked as XmlTransient so 
we're assured that its based off the same clock that created the last capture 
date
+                                if (statusRepository.getLastCaptureDate() == 
null || node.getHeartbeat().getCreatedTimestamp() > 
statusRepository.getLastCaptureDate().getTime()) {
+                                    
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
+                                }
+                            }
+                        }
+                    }
+                } catch(final Throwable t) {
+                    logger.warn("Unable to capture component metrics from Node 
heartbeats: " + t);
+                } finally {
+                    readLock.unlock("capture component metrics from node 
heartbeats");
+                }
+            }
+        }, componentStatusSnapshotMillis, componentStatusSnapshotMillis, 
TimeUnit.MILLISECONDS);
 
         remoteInputPort = properties.getRemoteInputPort();
         if (remoteInputPort == null) {
@@ -1807,20 +1838,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
                         // record heartbeat
                         node.setHeartbeat(mostRecentHeartbeat);
-
-                        ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
-                        if (statusRepository == null) {
-                            statusRepository = 
createComponentStatusRepository();
-                            
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
-                        }
-
-                        // If it's been a while since we've captured, capture 
this metric.
-                        final Date lastCaptureDate = 
statusRepository.getLastCaptureDate();
-                        final long millisSinceLastCapture = (lastCaptureDate 
== null) ? Long.MAX_VALUE : (System.currentTimeMillis() - 
lastCaptureDate.getTime());
-
-                        if (millisSinceLastCapture > 
componentStatusSnapshotMillis) {
-                            
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
-                        }
                     }
                 } catch (final Exception e) {
                     logger.error("Failed to process heartbeat from {}:{} due 
to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), 
mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
@@ -3877,11 +3894,11 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         // Aggregate the snapshots
         final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new 
ArrayList<>();
         for (final Map.Entry<Date, List<StatusSnapshot>> entry : 
snapshotsToAggregate.entrySet()) {
-            final StatusSnapshotDTO dto = new StatusSnapshotDTO();
-            dto.setTimestamp(entry.getKey());
-
             final List<StatusSnapshot> snapshots = entry.getValue();
             final StatusSnapshot reducedSnapshot = 
snapshots.get(0).getValueReducer().reduce(snapshots);
+            
+            final StatusSnapshotDTO dto = new StatusSnapshotDTO();
+            dto.setTimestamp(reducedSnapshot.getTimestamp());
             
dto.setStatusMetrics(StatusHistoryUtil.createStatusSnapshotDto(reducedSnapshot).getStatusMetrics());
 
             aggregatedSnapshotDtos.add(dto);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f5479428/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
index 69bbbf7..e1fdca8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java
@@ -51,12 +51,20 @@ public class StandardStatusSnapshot implements 
StatusSnapshot {
         return new ValueReducer<StatusSnapshot, StatusSnapshot>() {
             @Override
             public StatusSnapshot reduce(final List<StatusSnapshot> values) {
+                Date reducedTimestamp = null;
                 final Set<MetricDescriptor<?>> allDescriptors = new 
LinkedHashSet<>(metricValues.keySet());
+                
                 for (final StatusSnapshot statusSnapshot : values) {
+                    if (reducedTimestamp == null) {
+                        reducedTimestamp = statusSnapshot.getTimestamp();
+                    }
                     
allDescriptors.addAll(statusSnapshot.getStatusMetrics().keySet());
                 }
 
                 final StandardStatusSnapshot reduced = new 
StandardStatusSnapshot();
+                if (reducedTimestamp != null) {
+                    reduced.setTimestamp(reducedTimestamp);
+                }
 
                 for (final MetricDescriptor<?> descriptor : allDescriptors) {
                     final Long descriptorValue = 
descriptor.getValueReducer().reduce(values);

Reply via email to