This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 9761ee1b4d19f509d39f4d53eb32e620fdfe28d7 Author: Matthew Burgess <[email protected]> AuthorDate: Wed Jul 24 13:06:51 2019 -0400 NIFI-6510 Revert "DFA-9 Remove redundant connection prediction interfaces as we can just use ConnectionStatusAnalytics directly" This reverts commit 5b9fead1471059098c0e98343fb337070f1c75c1. --- .../status/analytics/StatusAnalytics.java | 28 ++++++++++++++++++++++ .../analytics/CachingStatusAnalyticEngine.java | 20 ++++++++++++++++ .../status/analytics/StatusAnalyticEngine.java | 22 +++++++++++++++++ .../apache/nifi/reporting/StandardEventAccess.java | 10 ++++---- 4 files changed, 74 insertions(+), 6 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java index 131531f..45e1c12 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java @@ -28,4 +28,32 @@ public interface StatusAnalytics { * @return A ConnectionStatusAnalytics object */ ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId); + + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + * @param connectionId + */ + long getTimeToBytesBackpressureMillis(String connectionId); + + /** + * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue. + * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue. + * @param connectionId + */ + long getTimeToCountBackpressureMillis(String connectionId); + + /** + * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue. + * @param connectionId + */ + long getNextIntervalBytes(String connectionId); + + /** + * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.). + * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue. + * @param connectionId + */ + int getNextIntervalCount(String connectionId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java index 015d6f8..864a5d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java @@ -51,6 +51,26 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics { return cachedResult; } + @Override + public long getTimeToBytesBackpressureMillis(String connectionId) { + return 0; + } + + @Override + public long getTimeToCountBackpressureMillis(String connectionId) { + return getConnectionStatusAnalytics(connectionId).getTimeToCountBackpressureMillis(); + } + + @Override + public long getNextIntervalBytes(String connectionId) { + return 0; + } + + @Override + public int getNextIntervalCount(String connectionId) { + return 0; + } + protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){ long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java index 56c263e..5a873d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java @@ -158,6 +158,7 @@ public class StatusAnalyticEngine implements StatusAnalytics { public long getMinTimeToBackpressureMillis() { ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); List<Connection> allConnections = rootGroup.findAllConnections(); + rootGroup.findConnection("asdf"); long minTimeToBackpressure = Long.MAX_VALUE; for (Connection conn : allConnections) { @@ -168,4 +169,25 @@ public class StatusAnalyticEngine implements StatusAnalytics { LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure)); return minTimeToBackpressure; } + + // TODO - populate the prediction fields. Do we need to pass in connection ID? + @Override + public long getTimeToCountBackpressureMillis(String connectionId) { + return 0; + } + + @Override + public long getTimeToBytesBackpressureMillis(String connectionId) { + return 0; + } + + @Override + public long getNextIntervalBytes(String connectionId) { + return 0; + } + + @Override + public int getNextIntervalCount(String connectionId) { + return 0; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index aeb9559..87fcd4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -51,7 +51,6 @@ import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.RunStatus; import org.apache.nifi.controller.status.TransmissionStatus; -import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics; import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; @@ -343,11 +342,10 @@ public class StandardEventAccess implements UserAwareEventAccess { } if (statusAnalytics != null) { - ConnectionStatusAnalytics connectionStatusAnalytics = statusAnalytics.getConnectionStatusAnalytics(conn.getIdentifier()); - connStatus.setPredictedTimeToBytesBackpressureMillis(connectionStatusAnalytics.getTimeToBytesBackpressureMillis()); - connStatus.setPredictedTimeToCountBackpressureMillis(connectionStatusAnalytics.getTimeToCountBackpressureMillis()); - connStatus.setNextPredictedQueuedBytes(connectionStatusAnalytics.getNextIntervalBytes()); - connStatus.setNextPredictedQueuedCount(connectionStatusAnalytics.getNextIntervalCount()); + connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis(conn.getIdentifier())); + connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis(conn.getIdentifier())); + connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes(conn.getIdentifier())); + connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount(conn.getIdentifier())); } if (isConnectionAuthorized) {
