This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2d8c70f907ee72391aa9e18f037b8f9b7c6dbb86 Author: Andrew I. Christianson <[email protected]> AuthorDate: Thu Jul 11 12:02:22 2019 -0400 NIFI-6510 Implemented basic linear regression model for queue counts --- .../nifi-framework/nifi-framework-core/pom.xml | 5 +++ .../org/apache/nifi/controller/FlowController.java | 2 +- .../status/analytics/StatusAnalyticEngine.java | 52 ++++++++++++++++++---- 3 files changed, 49 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index a1bff42..6551d54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -134,6 +134,11 @@ <artifactId>commons-io</artifactId> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + <version>3.6.1</version> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-data-provenance-utils</artifactId> <version>1.10.0-SNAPSHOT</version> diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 4c0288f..f7ed734 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -608,7 +608,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node @Override public void run() { try { - analyticsEngine.getMinTimeToBackpressure(); + analyticsEngine.getMinTimeToBackpressureMillis(); } catch (final Exception e) { LOG.error("Failed to capture component stats for Stats History", e); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java index 8b69ebf..0602a93 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.Date; import java.util.List; -import java.util.Map.Entry; +import org.apache.commons.math3.stat.regression.SimpleRegression; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; import org.apache.nifi.controller.status.history.StatusHistoryUtil; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; @@ -40,21 +42,53 @@ public class StatusAnalyticEngine { this.statusRepository = statusRepository; } - public long getMinTimeToBackpressure() { + public long getMinTimeToBackpressureMillis() { ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); List<Connection> allConnections = rootGroup.findAllConnections(); + long minTimeToBackpressure = Long.MAX_VALUE; for (Connection conn : allConnections) { LOG.info("Getting connection history for: " + conn.getIdentifier()); - StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO( - statusRepository.getConnectionStatusHistory(conn.getIdentifier(), null, null, Integer.MAX_VALUE)); - for (StatusSnapshotDTO snap : connHistory.getAggregateSnapshots()) { - for (Entry<String, Long> snapEntry : snap.getStatusMetrics().entrySet()) { - LOG.info("Snap " + snapEntry.getKey() + ": " + snapEntry.getValue()); - } + Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000)); + StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(statusRepository + .getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE)); + List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots(); + + if (aggregateSnapshots.size() < 2) { + LOG.info("Not enough data to model time to backpressure."); + continue; } + + long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); + LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is " + + Long.toString(backPressureObjectThreshold)); + + ConnectionStatusDescriptor.QUEUED_COUNT.getField(); + + SimpleRegression regression = new SimpleRegression(); + + for (StatusSnapshotDTO snap : aggregateSnapshots) { + Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()); + long snapTime = snap.getTimestamp().getTime(); + regression.addData(snapTime, snapQueuedCount); + } + + // Skip this connection if its queue is declining. + if (regression.getSlope() <= 0) { + LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure."); + continue; + } + + // Compute time-to backpressure for this connection; Reduce total result iff + // this connection is lower. + long connTimeToBackpressure = Math + .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope()) + - System.currentTimeMillis(); + LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure); + minTimeToBackpressure = Math.min(minTimeToBackpressure, connTimeToBackpressure); } - return 0; + LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure)); + return minTimeToBackpressure; } }
