This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b12f9d1a31a20393dccbab886eb7c5be61ee91d5 Author: Yolanda Davis <[email protected]> AuthorDate: Fri Aug 2 08:05:24 2019 -0400 NIFI-6150 Fixed NaN check and refactored time prediction. Switched to use non caching engine for testing --- .../org/apache/nifi/controller/FlowController.java | 4 +- .../analytics/ConnectionStatusAnalytics.java | 57 ++++++++++++---------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 141a6a4..33ad7f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -150,7 +150,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.state.server.ZooKeeperStateServer; -import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine; +import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalyticsEngine; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; @@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } componentStatusRepository = createComponentStatusRepository(); - analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository); + analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository); eventAccess = new StandardEventAccess(this, flowFileEventRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index 6b831fa..0cb6b5d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -73,10 +73,12 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { public void refresh() { - LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier); - - this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis()); + if (this.queryWindow == null) { + this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis()); + } else if (supportsOnlineLearning()) { + this.queryWindow = new QueryWindow(queryWindow.getEndTimeMillis(), System.currentTimeMillis()); + } modelMap.forEach((metric, modelFunction) -> { StatusAnalyticsModel model = modelFunction.getKey(); @@ -91,6 +93,18 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { }); } + protected Long getTimePrediction(Double prediction, Long timeMillis) { + + if (Double.isNaN(prediction) || Double.isInfinite(prediction)) { + return -1L; + } else if (prediction < timeMillis) { + return 0L; + } else { + return Math.max(0, Math.round(prediction) - timeMillis); + } + + } + /** * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue. * @@ -106,11 +120,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold(); final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B); final double prediction = bytesModel.predictX(backPressureBytes); - if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) { - return Math.max(0, Math.round(prediction) - System.currentTimeMillis()); - } else { - return -1L; - } + return getTimePrediction(prediction, System.currentTimeMillis()); } @@ -127,13 +137,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!"); } final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold(); - final Double prediction = countModel.predictX(backPressureCountThreshold); - - if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) { - return Math.max(0, Math.round(prediction) - System.currentTimeMillis()); - } else { - return -1L; - } + final double prediction = countModel.predictX(backPressureCountThreshold); + return getTimePrediction(prediction, System.currentTimeMillis()); } /** @@ -145,7 +150,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { public Long getNextIntervalBytes() { final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey(); final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis()); - if (prediction != Double.NaN && prediction >= 0) { + if (!Double.isNaN(prediction) && prediction >= 0) { return Math.round(prediction); } else { return -1L; @@ -161,14 +166,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { public Long getNextIntervalCount() { final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey(); final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis()); - if (prediction != Double.NaN && prediction >= 0) { + if (!Double.isNaN(prediction) && prediction >= 0) { return Math.round(prediction); } else { return -1L; } } - public Long getNextIntervalPercentageUseCount(){ + public Long getNextIntervalPercentageUseCount() { final Connection connection = getConnection(); if (connection == null) { @@ -177,15 +182,15 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold(); final long nextIntervalCount = getNextIntervalCount(); - if(nextIntervalCount > -1L) { + if (nextIntervalCount > -1L) { return Math.round((getNextIntervalCount() / backPressureCountThreshold) * 100); - }else{ + } else { return -1L; } } - public Long getNextIntervalPercentageUseBytes(){ + public Long getNextIntervalPercentageUseBytes() { final Connection connection = getConnection(); if (connection == null) { @@ -195,14 +200,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B); final long nextIntervalBytes = getNextIntervalBytes(); - if(nextIntervalBytes > -1L) { + if (nextIntervalBytes > -1L) { return Math.round((getNextIntervalBytes() / backPressureBytes) * 100); - }else{ + } else { return -1L; } } - public Long getIntervalTimeMillis(){ + public Long getIntervalTimeMillis() { return (5L * 60 * 1000); } @@ -226,8 +231,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { predictions.put("nextIntervalPercentageUseBytes", getNextIntervalPercentageUseBytes()); predictions.put("intervalTimeMillis", getIntervalTimeMillis()); - predictions.forEach((key,value) -> { - LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value); + predictions.forEach((key, value) -> { + LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier, key, value); }); return predictions;
