This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit a4cab9c30211663360093f3b62fce9145cd232fd Author: Andrew I. Christianson <[email protected]> AuthorDate: Tue Jul 16 15:57:14 2019 -0400 NIFI-6510 Connect the dots for StatusAnalytics -> API --- ...alytics.java => ConnectionStatusAnalytics.java} | 9 +- .../status/analytics/StatusAnalytics.java | 8 +- .../org/apache/nifi/controller/FlowController.java | 10 +- .../status/analytics/StatusAnalyticEngine.java | 106 ++++++++++++++++----- .../org/apache/nifi/web/api/dto/DtoFactory.java | 4 +- .../nifi/web/controller/ControllerFacade.java | 9 +- 6 files changed, 114 insertions(+), 32 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java similarity index 79% copy from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java copy to nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index d6ad3bc..12c8a15 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.controller.status.analytics; -public interface StatusAnalytics { +public interface ConnectionStatusAnalytics { long getMinTimeToBackpressureMillis(); + String getGroupId(); + String getId(); + String getName(); + String getSourceId(); + String getSourceName(); + String getDestinationId(); + String getDestinationName(); } diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java index d6ad3bc..42c2abd 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java @@ -16,6 +16,12 @@ */ package org.apache.nifi.controller.status.analytics; +/** + * StatusAnalytics + */ public interface StatusAnalytics { - long getMinTimeToBackpressureMillis(); + + ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId); + + public long getMinTimeToBackpressureMillis(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 0c422b4..56272ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -350,6 +350,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node // guarded by rwLock private NodeConnectionStatus connectionStatus; + private StatusAnalyticEngine analyticsEngine; + // guarded by rwLock private String instanceId; @@ -603,7 +605,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); - StatusAnalytics analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository); + analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @Override @@ -614,7 +616,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node LOG.error("Failed to capture component stats for Stats History", e); } } - }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval + }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval (or maybe just compute on the fly when requested) this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); @@ -1387,6 +1389,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node return eventAccess; } + public StatusAnalytics getStatusAnalytics() { + return analyticsEngine; + } + /** * Sets the root group to the given group * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java index 9231707..64c2065 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java @@ -43,22 +43,28 @@ public class StatusAnalyticEngine implements StatusAnalytics { } @Override - public long getMinTimeToBackpressureMillis() { + public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) { ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); - List<Connection> allConnections = rootGroup.findAllConnections(); - long minTimeToBackpressure = Long.MAX_VALUE; + return getConnectionStatusAnalytics(rootGroup.findConnection(connectionId)); + } - for (Connection conn : allConnections) { - LOG.info("Getting connection history for: " + conn.getIdentifier()); - Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000)); - StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(statusRepository - .getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE)); - List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots(); - - if (aggregateSnapshots.size() < 2) { - LOG.info("Not enough data to model time to backpressure."); - continue; - } + /** + * Finds the number of millis until the given connection will experience backpressure. + * @param conn the connection to run the analytic on + * @return + */ + public ConnectionStatusAnalytics getConnectionStatusAnalytics(Connection conn) { + LOG.info("Getting connection history for: " + conn.getIdentifier()); + long connTimeToBackpressure; + Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000)); + StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( + statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE)); + List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots(); + + if (aggregateSnapshots.size() < 2) { + LOG.info("Not enough data to model time to backpressure."); + connTimeToBackpressure = Long.MAX_VALUE; + } else { long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is " @@ -77,16 +83,72 @@ public class StatusAnalyticEngine implements StatusAnalytics { // Skip this connection if its queue is declining. if (regression.getSlope() <= 0) { LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure."); - continue; + connTimeToBackpressure = Long.MAX_VALUE; + } else { + + // Compute time-to backpressure for this connection; Reduce total result iff + // this connection is lower. + connTimeToBackpressure = Math + .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope()) + - System.currentTimeMillis(); + LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure); + } + } + + return new ConnectionStatusAnalytics() { + + @Override + public String getSourceName() { + return conn.getSource().getName(); + } + + @Override + public String getSourceId() { + return conn.getSource().getIdentifier(); + } + + @Override + public String getName() { + return conn.getName(); + } + + @Override + public long getMinTimeToBackpressureMillis() { + return connTimeToBackpressure; + } + + @Override + public String getId() { + return conn.getIdentifier(); + } + + @Override + public String getGroupId() { + return conn.getProcessGroup().getIdentifier(); } - // Compute time-to backpressure for this connection; Reduce total result iff - // this connection is lower. - long connTimeToBackpressure = Math - .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope()) - - System.currentTimeMillis(); - LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure); - minTimeToBackpressure = Math.min(minTimeToBackpressure, connTimeToBackpressure); + @Override + public String getDestinationName() { + return conn.getDestination().getName(); + } + + @Override + public String getDestinationId() { + return conn.getDestination().getIdentifier(); + } + }; + } + + @Override + public long getMinTimeToBackpressureMillis() { + ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); + List<Connection> allConnections = rootGroup.findAllConnections(); + rootGroup.findConnection("asdf"); + long minTimeToBackpressure = Long.MAX_VALUE; + + for (Connection conn : allConnections) { + ConnectionStatusAnalytics connAnalytics = getConnectionStatusAnalytics(conn); + minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getMinTimeToBackpressureMillis()); } LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6903e44..b8fed37 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -105,7 +105,7 @@ import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.analytics.StatusAnalytics; +import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; import org.apache.nifi.controller.status.history.GarbageCollectionStatus; import org.apache.nifi.diagnostics.GarbageCollection; @@ -1189,7 +1189,7 @@ public final class DtoFactory { return connectionStatusDto; } - public ConnectionStatisticsDTO createConnectionStatisticsDto(final StatusAnalytics connectionStatistics) { + public ConnectionStatisticsDTO createConnectionStatisticsDto(final ConnectionStatusAnalytics connectionStatistics) { final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO(); connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId()); connectionStatisticsDTO.setId(connectionStatistics.getId()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index c1b6754..367ea51 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -56,6 +56,7 @@ import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics; import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.diagnostics.SystemDiagnostics; @@ -687,7 +688,7 @@ public class ControllerFacade implements Authorizable { * @param connectionId connection id * @return the statistics for the specified connection */ - public StatusAnalytics getConnectionStatistics(final String connectionId) { + public ConnectionStatusAnalytics getConnectionStatistics(final String connectionId) { final ProcessGroup root = getRootGroup(); final Connection connection = root.findConnection(connectionId); @@ -703,13 +704,13 @@ public class ControllerFacade implements Authorizable { throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); } - // TODO get from flow controller - final StatusAnalytics status; + // get from flow controller + final StatusAnalytics status = flowController.getStatusAnalytics(); if (status == null) { throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId)); } - return status; + return status.getConnectionStatusAnalytics(connectionId); } /**
