This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit b12f9d1a31a20393dccbab886eb7c5be61ee91d5
Author: Yolanda Davis <[email protected]>
AuthorDate: Fri Aug 2 08:05:24 2019 -0400

    NIFI-6150 Fixed NaN check and refactored time prediction. Switched to use 
non caching engine for testing
---
 .../org/apache/nifi/controller/FlowController.java |  4 +-
 .../analytics/ConnectionStatusAnalytics.java       | 57 ++++++++++++----------
 2 files changed, 33 insertions(+), 28 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 141a6a4..33ad7f7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -150,7 +150,7 @@ import 
org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import 
org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
+import 
org.apache.nifi.controller.status.analytics.ConnectionStatusAnalyticsEngine;
 import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
@@ -594,7 +594,7 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
         }
 
         componentStatusRepository = createComponentStatusRepository();
-        analyticsEngine = new 
CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
+        analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, 
componentStatusRepository);
         eventAccess = new StandardEventAccess(this, flowFileEventRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 6b831fa..0cb6b5d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -73,10 +73,12 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
 
     public void refresh() {
 
-
         LOG.debug("Refreshing model with new data for connection id: {} ", 
connectionIdentifier);
-
-        this.queryWindow = new QueryWindow(System.currentTimeMillis() - 
getIntervalTimeMillis(), System.currentTimeMillis());
+        if (this.queryWindow == null) {
+            this.queryWindow = new QueryWindow(System.currentTimeMillis() - 
getIntervalTimeMillis(), System.currentTimeMillis());
+        } else if (supportsOnlineLearning()) {
+            this.queryWindow = new QueryWindow(queryWindow.getEndTimeMillis(), 
System.currentTimeMillis());
+        }
         modelMap.forEach((metric, modelFunction) -> {
 
             StatusAnalyticsModel model = modelFunction.getKey();
@@ -91,6 +93,18 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         });
     }
 
+    protected Long getTimePrediction(Double prediction, Long timeMillis) {
+
+        if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
+            return -1L;
+        } else if (prediction < timeMillis) {
+            return 0L;
+        } else {
+            return Math.max(0, Math.round(prediction) - timeMillis);
+        }
+
+    }
+
     /**
      * Returns the predicted time (in milliseconds) when backpressure is 
expected to be applied to this connection, based on the total number of bytes 
in the queue.
      *
@@ -106,11 +120,7 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         final String backPressureDataSize = 
connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
         final double backPressureBytes = 
DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
         final double prediction = bytesModel.predictX(backPressureBytes);
-        if (prediction != Double.NaN && prediction >= 
System.currentTimeMillis() && !Double.isInfinite(prediction)) {
-            return Math.max(0, Math.round(prediction) - 
System.currentTimeMillis());
-        } else {
-            return -1L;
-        }
+        return getTimePrediction(prediction, System.currentTimeMillis());
 
     }
 
@@ -127,13 +137,8 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
             throw new NoSuchElementException("Connection with the following id 
cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
         }
         final double backPressureCountThreshold = 
connection.getFlowFileQueue().getBackPressureObjectThreshold();
-        final Double prediction = 
countModel.predictX(backPressureCountThreshold);
-
-        if (prediction != Double.NaN && prediction >= 
System.currentTimeMillis() && !Double.isInfinite(prediction)) {
-            return Math.max(0, Math.round(prediction) - 
System.currentTimeMillis());
-        } else {
-            return -1L;
-        }
+        final double prediction = 
countModel.predictX(backPressureCountThreshold);
+        return getTimePrediction(prediction, System.currentTimeMillis());
     }
 
     /**
@@ -145,7 +150,7 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
     public Long getNextIntervalBytes() {
         final BivariateStatusAnalyticsModel bytesModel = 
(BivariateStatusAnalyticsModel) 
modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
         final Double prediction = bytesModel.predictY((double) 
System.currentTimeMillis() + getIntervalTimeMillis());
-        if (prediction != Double.NaN && prediction >= 0) {
+        if (!Double.isNaN(prediction) && prediction >= 0) {
             return Math.round(prediction);
         } else {
             return -1L;
@@ -161,14 +166,14 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
     public Long getNextIntervalCount() {
         final BivariateStatusAnalyticsModel countModel = 
(BivariateStatusAnalyticsModel) 
modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
         final Double prediction = countModel.predictY((double) 
System.currentTimeMillis() + getIntervalTimeMillis());
-        if (prediction != Double.NaN && prediction >= 0) {
+        if (!Double.isNaN(prediction) && prediction >= 0) {
             return Math.round(prediction);
         } else {
             return -1L;
         }
     }
 
-    public Long getNextIntervalPercentageUseCount(){
+    public Long getNextIntervalPercentageUseCount() {
 
         final Connection connection = getConnection();
         if (connection == null) {
@@ -177,15 +182,15 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         final double backPressureCountThreshold = 
connection.getFlowFileQueue().getBackPressureObjectThreshold();
         final long nextIntervalCount = getNextIntervalCount();
 
-        if(nextIntervalCount > -1L) {
+        if (nextIntervalCount > -1L) {
             return Math.round((getNextIntervalCount() / 
backPressureCountThreshold) * 100);
-        }else{
+        } else {
             return -1L;
         }
 
     }
 
-    public Long getNextIntervalPercentageUseBytes(){
+    public Long getNextIntervalPercentageUseBytes() {
 
         final Connection connection = getConnection();
         if (connection == null) {
@@ -195,14 +200,14 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         final double backPressureBytes = 
DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
         final long nextIntervalBytes = getNextIntervalBytes();
 
-        if(nextIntervalBytes > -1L) {
+        if (nextIntervalBytes > -1L) {
             return Math.round((getNextIntervalBytes() / backPressureBytes) * 
100);
-        }else{
+        } else {
             return -1L;
         }
     }
 
-    public Long getIntervalTimeMillis(){
+    public Long getIntervalTimeMillis() {
         return (5L * 60 * 1000);
     }
 
@@ -226,8 +231,8 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         predictions.put("nextIntervalPercentageUseBytes", 
getNextIntervalPercentageUseBytes());
         predictions.put("intervalTimeMillis", getIntervalTimeMillis());
 
-        predictions.forEach((key,value) -> {
-            LOG.debug("Prediction model for connection id {}: {}={} ", 
connectionIdentifier,key,value);
+        predictions.forEach((key, value) -> {
+            LOG.debug("Prediction model for connection id {}: {}={} ", 
connectionIdentifier, key, value);
         });
 
         return predictions;

Reply via email to