This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 8810f9c7b96a9bd3833eda703085e7938ff0a312
Author: Yolanda Davis <[email protected]>
AuthorDate: Tue Aug 20 19:59:58 2019 -0400

    NIFI-6566 - Refactor to decouple model instance from status analytics 
object. Also allow configurable model from nifi.properties
    
    NIFI-6566 - changes to allow scoring configurations for model in 
nifi.properties
    
    NIFI-6566 - added default implementation value to NiFiProperties
    
    NIFI-6566 - correction to default variable name in NiFiProperties, removed 
unnecessary init method from ConnectionStatusAnalytics
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #3663
---
 .../status/analytics/StatusAnalyticsModel.java     |   4 +
 .../java/org/apache/nifi/util/NiFiProperties.java  |   7 +-
 .../resources/NiFiProperties/conf/nifi.properties  |   3 +-
 .../src/main/asciidoc/administration-guide.adoc    |   4 +
 .../org/apache/nifi/controller/FlowController.java |  27 ++-
 .../CachingConnectionStatusAnalyticsEngine.java    |  27 ++-
 .../analytics/ConnectionStatusAnalytics.java       | 185 +++++++++------------
 .../analytics/ConnectionStatusAnalyticsEngine.java |  26 ++-
 .../analytics/StatusAnalyticsModelMapFactory.java  | 106 ++++++++++++
 ...Model.java => StatusMetricExtractFunction.java} |  20 +--
 .../models/BivariateStatusAnalyticsModel.java      |  17 +-
 .../models/MultivariateStatusAnalyticsModel.java   |  50 ------
 ...tSquaresMSAM.java => OrdinaryLeastSquares.java} |  29 ++--
 ...leRegressionBSAM.java => SimpleRegression.java} |  32 ++--
 ...ontroller.status.analytics.StatusAnalyticsModel |  16 ++
 ...TestCachingConnectionStatusAnalyticsEngine.java |  19 ++-
 .../analytics/TestConnectionStatusAnalytics.java   |  25 ++-
 .../TestConnectionStatusAnalyticsEngine.java       |   8 +-
 .../analytics/TestStatusAnalyticsEngine.java       |  29 +++-
 .../TestStatusAnalyticsModelMapFactory.java        |  66 ++++++++
 ...uresMSAM.java => TestOrdinaryLeastSqaures.java} |  37 ++---
 ...gressionBSAM.java => TestSimpleRegression.java} |  10 +-
 .../src/test/resources/conf/nifi.properties        |   5 +-
 .../nar/StandardExtensionDiscoveringManager.java   |  34 ++--
 .../nifi-framework/nifi-resources/pom.xml          |   3 +
 .../src/main/resources/conf/nifi.properties        |   5 +-
 26 files changed, 497 insertions(+), 297 deletions(-)

diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
index 4869a0f..8d601e6 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
@@ -16,12 +16,16 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.Map;
 import java.util.stream.Stream;
 
 public interface StatusAnalyticsModel {
 
     void learn(Stream<Double[]> features, Stream<Double> labels);
     Double predict(Double[] feature);
+    Double predictVariable(Integer predictVariableIndex, Map<Integer,Double> 
knownVariablesWithIndex, Double label);
     Boolean supportsOnlineLearning();
+    Map<String,Double> getScores();
+    void clear();
 
 }
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index aa00793..32e273e 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -240,6 +240,9 @@ public abstract class NiFiProperties {
 
     // analytics properties
     public static final String ANALYTICS_PREDICTION_INTERVAL = 
"nifi.analytics.predict.interval";
+    public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = 
"nifi.analytics.connection.model.implementation";
+    public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= 
"nifi.analytics.connection.model.score.name";
+    public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = 
"nifi.analytics.connection.model.score.threshold";
 
     // defaults
     public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
@@ -313,7 +316,9 @@ public abstract class NiFiProperties {
 
     // analytics defaults
     public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3 
mins";
-
+    public final static String 
DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = 
"org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares";
+    public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = 
"rSquared";
+    public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = 
.90;
 
     /**
      * Retrieves the property value for the given property key.
diff --git 
a/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties
 
b/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties
index 9ab3d99..d3bd5d7 100644
--- 
a/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties
+++ 
b/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties
@@ -122,4 +122,5 @@ nifi.cluster.manager.protocol.threads=10
 nifi.cluster.manager.safemode.duration=0 sec
 
 # analytics properties #
-nifi.analytics.predict.interval=3 mins
\ No newline at end of file
+nifi.analytics.predict.interval=3 mins
+nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares
\ No newline at end of file
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 6fccde9..0b2fd8c 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -3325,6 +3325,10 @@ These properties determine the behavior of the internal 
NiFi Analytics capabilit
 |====
 |*Property*|*Description*
 |`nifi.analytics.predict.interval`|This indicates a time interval for which 
analytical predictions (queue saturation, e.g.) should be made. The default 
value is `3 mins`.
+|`nifi.analytics.connection.model.implementation`|This is the implementation 
class for the status analytics model used to make connection predictions.  The 
default value is 
`org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`.
+|`nifi.analytics.connection.model.score.name`|This is the name of the scoring 
type that should be used to evaluate model.  The default value is `rSquared`.
+|`nifi.analytics.connection.model.score.threshold`|This is the threshold for 
the scoring value (where score should be above given threshold).  The default 
value is `.9`.
+
 |====
 
 [[custom_properties]]
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 55ff0bb..eeb9f82 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -149,8 +149,11 @@ import 
org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import 
org.apache.nifi.controller.status.analytics.ConnectionStatusAnalyticsEngine;
+import 
org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
 import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
+import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
+import 
org.apache.nifi.controller.status.analytics.StatusAnalyticsModelMapFactory;
+import org.apache.nifi.controller.status.analytics.StatusMetricExtractFunction;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
 import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -210,6 +213,7 @@ import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
+import org.apache.nifi.util.Tuple;
 import org.apache.nifi.util.concurrency.TimedLock;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
@@ -602,8 +606,25 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
             predictionIntervalMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL,
 TimeUnit.MILLISECONDS);
         }
 
+        // Determine score name to use for evaluating model performance
+        String modelScoreName = 
nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME,
 NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME);
+
+        // Determine score threshold to use when evaluating acceptable model
+        Double modelScoreThreshold;
+        try {
+            modelScoreThreshold = 
Double.valueOf(nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME,
 NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL));
+        }catch (final Exception e) {
+            modelScoreThreshold = 
Double.valueOf(NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD);
+        }
+
         componentStatusRepository = createComponentStatusRepository();
-        analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, 
componentStatusRepository, flowFileEventRepository, predictionIntervalMillis);
+
+        final Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> modelMap = StatusAnalyticsModelMapFactory
+                                                                               
                     .getConnectionStatusModelMap(extensionManager, 
nifiProperties);
+
+        analyticsEngine = new 
CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, 
flowFileEventRepository, modelMap,
+                                                                
predictionIntervalMillis, modelScoreName, modelScoreThreshold);
+
         eventAccess = new StandardEventAccess(this, flowFileEventRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@@ -1042,8 +1063,6 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
     }
 
 
-
-
     public KerberosConfig createKerberosConfig(final NiFiProperties 
nifiProperties) {
         final String principal = nifiProperties.getKerberosServicePrincipal();
         final String keytabLocation = 
nifiProperties.getKerberosServiceKeytabLocation();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
index 95f655e..0cda68c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
@@ -16,49 +16,44 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 
-public class CachingConnectionStatusAnalyticsEngine implements 
StatusAnalyticsEngine {
-    private final ComponentStatusRepository statusRepository;
-    private final FlowManager flowManager;
-    private final FlowFileEventRepository flowFileEventRepository;
-    private volatile Cache<String, ConnectionStatusAnalytics> cache;
-    private final long predictionIntervalMillis;
+public class CachingConnectionStatusAnalyticsEngine extends 
ConnectionStatusAnalyticsEngine{
+    private volatile Cache<String, StatusAnalytics> cache;
     private static final Logger LOG = 
LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class);
 
     public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, 
ComponentStatusRepository statusRepository,
-                                                  FlowFileEventRepository 
flowFileEventRepository, long predictionIntervalMillis) {
-        this.flowManager = flowManager;
-        this.statusRepository = statusRepository;
-        this.flowFileEventRepository = flowFileEventRepository;
+            FlowFileEventRepository flowFileEventRepository, Map<String, 
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
+            long predictionIntervalMillis, String scoreName, double 
scoreThreshold) {
+
+        
super(flowManager,statusRepository,flowFileEventRepository,modelMap,predictionIntervalMillis,scoreName,scoreThreshold);
         this.cache = Caffeine.newBuilder()
                 .expireAfterWrite(30, TimeUnit.MINUTES)
                 .build();
-        this.predictionIntervalMillis = predictionIntervalMillis;
     }
 
     @Override
     public StatusAnalytics getStatusAnalytics(String identifier) {
 
-        ConnectionStatusAnalytics connectionStatusAnalytics = 
cache.getIfPresent(identifier);
+        StatusAnalytics connectionStatusAnalytics = 
cache.getIfPresent(identifier);
         if (connectionStatusAnalytics == null) {
             LOG.debug("Creating new status analytics object for connection id: 
{}", identifier);
-            connectionStatusAnalytics = new 
ConnectionStatusAnalytics(statusRepository, 
flowManager,flowFileEventRepository, identifier, true);
-            
connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis);
-            connectionStatusAnalytics.init();
+            connectionStatusAnalytics = super.getStatusAnalytics(identifier);
             cache.put(identifier, connectionStatusAnalytics);
         } else {
             LOG.debug("Pulled existing analytics from cache for connection id: 
{}", identifier);
-            connectionStatusAnalytics.refresh();
+            ((ConnectionStatusAnalytics)connectionStatusAnalytics).refresh();
         }
         return connectionStatusAnalytics;
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 313e7ab..bc2270b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-import java.util.Random;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -31,18 +30,11 @@ import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEvent;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.RepositoryStatusReport;
-import 
org.apache.nifi.controller.status.analytics.models.MultivariateStatusAnalyticsModel;
-import 
org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquaresMSAM;
-import 
org.apache.nifi.controller.status.analytics.models.VariateStatusAnalyticsModel;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
-import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.StatusHistory;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.util.Tuple;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +43,7 @@ import com.google.common.primitives.Doubles;
 public class ConnectionStatusAnalytics implements StatusAnalytics {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionStatusAnalytics.class);
-    private Map<String, Tuple<StatusAnalyticsModel, ExtractFunction>> modelMap;
+    private final Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> modelMap;
     private QueryWindow queryWindow;
     private final ComponentStatusRepository componentStatusRepository;
     private final FlowFileEventRepository flowFileEventRepository;
@@ -60,47 +52,32 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
     private final Boolean supportOnlineLearning;
     private Boolean extendWindow = false;
     private long intervalMillis = 3L * 60 * 1000; // Default is 3 minutes
-    private static double SCORE_THRESHOLD = .90;
+    private String scoreName = "rSquared";
+    private double scoreThreshold = .90;
 
-    public ConnectionStatusAnalytics(ComponentStatusRepository 
componentStatusRepository, FlowManager flowManager, FlowFileEventRepository 
flowFileEventRepository, String connectionIdentifier,
-                                     Boolean supportOnlineLearning) {
+    public ConnectionStatusAnalytics(ComponentStatusRepository 
componentStatusRepository, FlowManager flowManager, FlowFileEventRepository 
flowFileEventRepository,
+                                     Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> modelMap, String connectionIdentifier, Boolean 
supportOnlineLearning) {
         this.componentStatusRepository = componentStatusRepository;
         this.flowManager = flowManager;
         this.flowFileEventRepository = flowFileEventRepository;
+        this.modelMap = modelMap;
         this.connectionIdentifier = connectionIdentifier;
         this.supportOnlineLearning = supportOnlineLearning;
     }
 
-    public void init() {
-
-        LOG.debug("Initialize analytics connection id: {} ", 
connectionIdentifier);
-
-        if (this.modelMap == null || this.modelMap.isEmpty()) {
-            Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = 
new Tuple<>(new OrdinaryLeastSquaresMSAM(), extract);
-            Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = 
new Tuple<>(new OrdinaryLeastSquaresMSAM(), extract);
-            this.modelMap = new HashMap<>();
-            //TODO: Should change keys used here
-            
this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), 
countModelFunction);
-            
this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), 
byteModelFunction);
-        }
-
-        refresh();
-    }
-
     public void refresh() {
 
-        if (this.queryWindow == null) {
-            //Set query window to fresh value
-            this.queryWindow = new QueryWindow(System.currentTimeMillis() - 
getIntervalTimeMillis(), System.currentTimeMillis());
-        } else if (supportOnlineLearning) {
+        if (supportOnlineLearning && this.queryWindow != null) {
             //Obtain latest observations when available, extend window if 
needed to obtain minimum observations
             this.queryWindow = new QueryWindow(extendWindow ? 
queryWindow.getStartTimeMillis() : queryWindow.getEndTimeMillis(), 
System.currentTimeMillis());
-
+        } else {
+            this.queryWindow = new QueryWindow(System.currentTimeMillis() - 
getIntervalTimeMillis(), System.currentTimeMillis());
         }
+
         modelMap.forEach((metric, modelFunction) -> {
 
             StatusAnalyticsModel model = modelFunction.getKey();
-            ExtractFunction extract = modelFunction.getValue();
+            StatusMetricExtractFunction extract = modelFunction.getValue();
             StatusHistory statusHistory = 
componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, 
queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), 
Integer.MAX_VALUE);
             Tuple<Stream<Double[]>, Stream<Double>> modelData = 
extract.extractMetric(metric, statusHistory);
             Double[][] features = modelData.getKey().toArray(size -> new 
Double[size][1]);
@@ -129,7 +106,7 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
      */
     public Long getTimeToBytesBackpressureMillis() {
 
-        final MultivariateStatusAnalyticsModel bytesModel = 
(MultivariateStatusAnalyticsModel) 
modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+        final StatusAnalyticsModel bytesModel = 
modelMap.get("queuedBytes").getKey();
         FlowFileEvent flowFileEvent = getStatusReport();
 
         final Connection connection = getConnection();
@@ -139,12 +116,12 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         final String backPressureDataSize = 
connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
         final double backPressureBytes = 
DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
 
-        if(validModel(bytesModel) && flowFileEvent != null) {
-            List<Tuple<Integer, Double>> predictFeatures = new ArrayList<>();
-            Double inOutRatio = (flowFileEvent.getContentSizeOut() / 
(double)flowFileEvent.getContentSizeIn());
-            predictFeatures.add(new Tuple<>(1, inOutRatio));
-            return getTimePrediction(bytesModel.predictVariable(0, 
predictFeatures, backPressureBytes), System.currentTimeMillis());
-        }else{
+        if (validModel(bytesModel) && flowFileEvent != null) {
+            Map<Integer, Double> predictFeatures = new HashMap<>();
+            Double inOutRatio = (flowFileEvent.getContentSizeOut() / (double) 
flowFileEvent.getContentSizeIn());
+            predictFeatures.put(1, inOutRatio);
+            return convertTimePrediction(bytesModel.predictVariable(0, 
predictFeatures, backPressureBytes), System.currentTimeMillis());
+        } else {
             return -1L;
         }
     }
@@ -156,7 +133,7 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
      */
     public Long getTimeToCountBackpressureMillis() {
 
-        final MultivariateStatusAnalyticsModel countModel = 
(MultivariateStatusAnalyticsModel) 
modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+        final StatusAnalyticsModel countModel = 
modelMap.get("queuedCount").getKey();
         FlowFileEvent flowFileEvent = getStatusReport();
 
         final Connection connection = getConnection();
@@ -166,12 +143,12 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
 
         final double backPressureCountThreshold = 
connection.getFlowFileQueue().getBackPressureObjectThreshold();
 
-        if(validModel(countModel) && flowFileEvent != null) {
-            List<Tuple<Integer, Double>> predictFeatures = new ArrayList<>();
-            Double inOutRatio = (flowFileEvent.getFlowFilesOut() / 
(double)flowFileEvent.getFlowFilesIn());
-            predictFeatures.add(new Tuple<>(1, inOutRatio));
-            return getTimePrediction(countModel.predictVariable(0, 
predictFeatures, backPressureCountThreshold), System.currentTimeMillis());
-        }else{
+        if (validModel(countModel) && flowFileEvent != null) {
+            Map<Integer, Double> predictFeatures = new HashMap<>();
+            Double inOutRatio = (flowFileEvent.getFlowFilesOut() / (double) 
flowFileEvent.getFlowFilesIn());
+            predictFeatures.put(1, inOutRatio);
+            return convertTimePrediction(countModel.predictVariable(0, 
predictFeatures, backPressureCountThreshold), System.currentTimeMillis());
+        } else {
             return -1L;
         }
     }
@@ -183,17 +160,17 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
      */
 
     public Long getNextIntervalBytes() {
-        final VariateStatusAnalyticsModel bytesModel = 
(VariateStatusAnalyticsModel) 
modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+        final StatusAnalyticsModel bytesModel = 
modelMap.get("queuedBytes").getKey();
         FlowFileEvent flowFileEvent = getStatusReport();
 
-        if(validModel(bytesModel) && flowFileEvent != null) {
+        if (validModel(bytesModel) && flowFileEvent != null) {
             List<Double> predictFeatures = new ArrayList<>();
-            Long nextInterval =  System.currentTimeMillis() + 
getIntervalTimeMillis();
-            Double inOutRatio = flowFileEvent.getContentSizeOut() / 
(double)flowFileEvent.getContentSizeIn();
+            Long nextInterval = System.currentTimeMillis() + 
getIntervalTimeMillis();
+            Double inOutRatio = flowFileEvent.getContentSizeOut() / (double) 
flowFileEvent.getContentSizeIn();
             predictFeatures.add(nextInterval.doubleValue());
             predictFeatures.add(inOutRatio);
-            return  (bytesModel.predict(predictFeatures.toArray(new 
Double[2]))).longValue();
-        }else{
+            return 
convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new 
Double[2])));
+        } else {
             return -1L;
         }
     }
@@ -205,17 +182,17 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
      */
 
     public Long getNextIntervalCount() {
-        final VariateStatusAnalyticsModel countModel = 
(VariateStatusAnalyticsModel) 
modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+        final StatusAnalyticsModel countModel = 
modelMap.get("queuedCount").getKey();
         FlowFileEvent flowFileEvent = getStatusReport();
 
-        if(validModel(countModel) && flowFileEvent != null) {
+        if (validModel(countModel) && flowFileEvent != null) {
             List<Double> predictFeatures = new ArrayList<>();
-            Long nextInterval =  System.currentTimeMillis() + 
getIntervalTimeMillis();
-            Double inOutRatio = flowFileEvent.getFlowFilesOut()/ 
(double)flowFileEvent.getFlowFilesIn();
+            Long nextInterval = System.currentTimeMillis() + 
getIntervalTimeMillis();
+            Double inOutRatio = flowFileEvent.getFlowFilesOut() / (double) 
flowFileEvent.getFlowFilesIn();
             predictFeatures.add(nextInterval.doubleValue());
             predictFeatures.add(inOutRatio);
-            return (countModel.predict(predictFeatures.toArray(new 
Double[2]))).longValue();
-        }else{
+            return 
convertCountPrediction(countModel.predict(predictFeatures.toArray(new 
Double[2])));
+        } else {
             return -1L;
         }
 
@@ -263,6 +240,22 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         this.intervalMillis = intervalTimeMillis;
     }
 
+    public String getScoreName() {
+        return scoreName;
+    }
+
+    public void setScoreName(String scoreName) {
+        this.scoreName = scoreName;
+    }
+
+    public double getScoreThreshold() {
+        return scoreThreshold;
+    }
+
+    public void setScoreThreshold(double scoreThreshold) {
+        this.scoreThreshold = scoreThreshold;
+    }
+
     @Override
     public QueryWindow getQueryWindow() {
         return queryWindow;
@@ -301,75 +294,47 @@ public class ConnectionStatusAnalytics implements 
StatusAnalytics {
         return connection.orElse(null);
     }
 
-    private FlowFileEvent getStatusReport(){
-        RepositoryStatusReport statusReport =  
flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
+    private FlowFileEvent getStatusReport() {
+        RepositoryStatusReport statusReport = 
flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
         return statusReport.getReportEntry(this.connectionIdentifier);
     }
 
-    private interface ExtractFunction {
-        Tuple<Stream<Double[]>, Stream<Double>> extractMetric(String metric, 
StatusHistory statusHistory);
+    private Long convertTimePrediction(Double prediction, Long timeMillis) {
+        if (Double.isNaN(prediction) || Double.isInfinite(prediction) || 
prediction < timeMillis) {
+            return -1L;
+        } else {
+            return Math.max(0, Math.round(prediction) - timeMillis);
+        }
     }
 
-    private Long getTimePrediction(Double prediction, Long timeMillis) {
-
-        if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
+    private Long convertCountPrediction(Double prediction) {
+        if (Double.isNaN(prediction) || Double.isInfinite(prediction) || 
prediction < 0) {
             return -1L;
-        } else if (prediction < timeMillis) {
-            return 0L;
         } else {
-            return Math.max(0, Math.round(prediction) - timeMillis);
+            return Math.max(0, Math.round(prediction));
         }
     }
 
-    private boolean validModel(VariateStatusAnalyticsModel model){
+    private boolean validModel(StatusAnalyticsModel model) {
 
-        Double rSquared = model.getRSquared();
+        Double score = getScore(model);
 
-        if (rSquared == null || (Doubles.isFinite(rSquared) && 
!Double.isNaN(rSquared) && rSquared < SCORE_THRESHOLD)) {
-            if(supportOnlineLearning && model.supportsOnlineLearning()){
+        if (score == null || (Doubles.isFinite(score) && !Double.isNaN(score) 
&& score < scoreThreshold)) {
+            if (supportOnlineLearning && model.supportsOnlineLearning()) {
                 model.clear();
             }
             return false;
-        }else {
+        } else {
             return true;
         }
     }
 
-    private final ExtractFunction extract = (metric, statusHistory) -> {
-
-        List<Double> values = new ArrayList<>();
-        List<Double[]> features = new ArrayList<>();
-        Random rand = new Random();
-        StatusHistoryDTO statusHistoryDTO = 
StatusHistoryUtil.createStatusHistoryDTO(statusHistory);
-
-        for (StatusSnapshotDTO snap : 
statusHistoryDTO.getAggregateSnapshots()) {
-            List<Double> featureArray = new ArrayList<>();
-            Long snapValue = snap.getStatusMetrics().get(metric);
-            long snapTime = snap.getTimestamp().getTime();
-
-            featureArray.add((double) snapTime);
-            Double randomError = + (rand.nextInt(1000) * .0000001);
-            if 
(metric.equals(ConnectionStatusDescriptor.QUEUED_COUNT.getField())) {
-
-                Long inputCount = 
snap.getStatusMetrics().get(ConnectionStatusDescriptor.INPUT_COUNT.getField());
-                Long outputCount = 
snap.getStatusMetrics().get(ConnectionStatusDescriptor.OUTPUT_COUNT.getField());
-                Double inOutRatio = ((double)outputCount /(double)inputCount) 
+ randomError;
-                featureArray.add(Double.isNaN(inOutRatio)? randomError : 
inOutRatio);
-
-            } else {
-                Long inputBytes = 
snap.getStatusMetrics().get(ConnectionStatusDescriptor.INPUT_BYTES.getField());
-                Long outputBytes = 
snap.getStatusMetrics().get(ConnectionStatusDescriptor.OUTPUT_BYTES.getField());
-                Double inOutRatio = ((double)outputBytes/(double)inputBytes) + 
randomError;
-                featureArray.add(Double.isNaN(inOutRatio)? randomError : 
inOutRatio);
-            }
-
-            values.add((double) snapValue);
-            features.add(featureArray.toArray(new 
Double[featureArray.size()]));
-
+    private Double getScore(StatusAnalyticsModel model) {
+        if (model != null && model.getScores() != null) {
+            return model.getScores().get(scoreName);
+        } else {
+            return null;
         }
-        return new Tuple<Stream<Double[]>, Stream<Double>>(features.stream(), 
values.stream());
-
-    };
-
+    }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
index a9ba4ea..de815fa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
@@ -16,31 +16,43 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.Map;
+
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
     private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
-    private final ComponentStatusRepository statusRepository;
-    private final FlowManager flowManager;
-    private final FlowFileEventRepository flowFileEventRepository;
-    private final long predictionIntervalMillis;
+    protected final ComponentStatusRepository statusRepository;
+    protected final FlowManager flowManager;
+    protected final FlowFileEventRepository flowFileEventRepository;
+    protected final Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> modelMap;
+    protected final long predictionIntervalMillis;
+    protected final String scoreName;
+    protected final double scoreThreshold;
 
-    public ConnectionStatusAnalyticsEngine(FlowManager flowManager, 
ComponentStatusRepository statusRepository, FlowFileEventRepository 
flowFileEventRepository, long predictionIntervalMillis) {
+    public ConnectionStatusAnalyticsEngine(FlowManager flowManager, 
ComponentStatusRepository statusRepository, FlowFileEventRepository 
flowFileEventRepository,
+            Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis, String 
scoreName, double scoreThreshold) {
         this.flowManager = flowManager;
         this.statusRepository = statusRepository;
         this.flowFileEventRepository = flowFileEventRepository;
         this.predictionIntervalMillis = predictionIntervalMillis;
+        this.modelMap = modelMap;
+        this.scoreName = scoreName;
+        this.scoreThreshold = scoreThreshold;
     }
 
     @Override
     public StatusAnalytics getStatusAnalytics(String identifier) {
-        ConnectionStatusAnalytics connectionStatusAnalytics = new 
ConnectionStatusAnalytics(statusRepository, flowManager, 
flowFileEventRepository, identifier, false);
+        ConnectionStatusAnalytics connectionStatusAnalytics = new 
ConnectionStatusAnalytics(statusRepository, flowManager, 
flowFileEventRepository, modelMap, identifier, false);
         
connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis);
-        connectionStatusAnalytics.init();
+        connectionStatusAnalytics.setScoreName(scoreName);
+        connectionStatusAnalytics.setScoreThreshold(scoreThreshold);
+        connectionStatusAnalytics.refresh();
         return connectionStatusAnalytics;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModelMapFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModelMapFactory.java
new file mode 100644
index 0000000..b8e6775
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModelMapFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarThreadContextClassLoader;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+
+public class StatusAnalyticsModelMapFactory {
+
+    private final static String QUEUED_COUNT_METRIC = "queuedCount";
+    private final static String QUEUED_BYTES_METRIC = "queuedBytes";
+    private final static String INPUT_COUNT_METRIC = "inputCount";
+    private final static String INPUT_BYTES_METRIC = "inputBytes";
+    private final static String OUTPUT_COUNT_METRIC = "outputCount";
+    private final static String OUTPUT_BYTES_METRIC = "outputBytes";
+
+
+    public static Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> getConnectionStatusModelMap(ExtensionManager 
extensionManager, NiFiProperties niFiProperties){
+            Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> modelMap = new HashMap<>();
+            StatusMetricExtractFunction extract = 
getConnectionStatusExtractFunction();
+            Tuple<StatusAnalyticsModel, StatusMetricExtractFunction> 
countModelFunction = new Tuple<>(createModelInstance(extensionManager, 
niFiProperties), extract);
+            Tuple<StatusAnalyticsModel, StatusMetricExtractFunction> 
byteModelFunction = new Tuple<>(createModelInstance(extensionManager, 
niFiProperties), extract);
+            modelMap.put(QUEUED_COUNT_METRIC, countModelFunction);
+            modelMap.put(QUEUED_BYTES_METRIC, byteModelFunction);
+            return modelMap;
+    }
+
+    private static StatusAnalyticsModel createModelInstance(ExtensionManager 
extensionManager, NiFiProperties nifiProperties) {
+        final String implementationClassName = 
nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION,
 NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION);
+        if (implementationClassName == null) {
+            throw new RuntimeException("Cannot create Analytics Model because 
the NiFi Properties is missing the following property: "
+                    + 
NiFiProperties.ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION);
+        }
+        try {
+            return 
NarThreadContextClassLoader.createInstance(extensionManager, 
implementationClassName, StatusAnalyticsModel.class, nifiProperties);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static StatusMetricExtractFunction 
getConnectionStatusExtractFunction() {
+
+        return (metric, statusHistory) -> {
+
+            List<Double> values = new ArrayList<>();
+            List<Double[]> features = new ArrayList<>();
+            Random rand = new Random();
+            StatusHistoryDTO statusHistoryDTO = 
StatusHistoryUtil.createStatusHistoryDTO(statusHistory);
+
+            for (StatusSnapshotDTO snap : 
statusHistoryDTO.getAggregateSnapshots()) {
+                List<Double> featureArray = new ArrayList<>();
+                Long snapValue = snap.getStatusMetrics().get(metric);
+                long snapTime = snap.getTimestamp().getTime();
+
+                featureArray.add((double) snapTime);
+                Double randomError = +(rand.nextInt(1000) * .0000001);
+                if (metric.equals(QUEUED_COUNT_METRIC)) {
+
+                    Long inputCount = 
snap.getStatusMetrics().get(INPUT_COUNT_METRIC);
+                    Long outputCount = 
snap.getStatusMetrics().get(OUTPUT_COUNT_METRIC);
+                    Double inOutRatio = ((double) outputCount / (double) 
inputCount) + randomError;
+                    featureArray.add(Double.isNaN(inOutRatio) ? randomError : 
inOutRatio);
+
+                } else {
+                    Long inputBytes = 
snap.getStatusMetrics().get(INPUT_BYTES_METRIC);
+                    Long outputBytes = 
snap.getStatusMetrics().get(OUTPUT_BYTES_METRIC);
+                    Double inOutRatio = ((double) outputBytes / (double) 
inputBytes) + randomError;
+                    featureArray.add(Double.isNaN(inOutRatio) ? randomError : 
inOutRatio);
+                }
+
+                values.add((double) snapValue);
+                features.add(featureArray.toArray(new 
Double[featureArray.size()]));
+
+            }
+            return new Tuple<>(features.stream(), values.stream());
+
+        };
+    }
+
+
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusMetricExtractFunction.java
similarity index 64%
rename from 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java
rename to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusMetricExtractFunction.java
index 44ce6b5..38d2c86 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusMetricExtractFunction.java
@@ -14,24 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.controller.status.analytics.models;
+package org.apache.nifi.controller.status.analytics;
 
-import java.util.Map;
 import java.util.stream.Stream;
 
-import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.util.Tuple;
 
-public interface VariateStatusAnalyticsModel extends StatusAnalyticsModel {
+public interface StatusMetricExtractFunction {
 
-    void learn(Stream<Double[]> features, Stream<Double> labels);
-
-    Double predict(Double[] feature);
-
-    Boolean supportsOnlineLearning();
-
-    Double getRSquared();
-
-    Map<String,Double> getScores();
-
-    void clear();
+    Tuple<Stream<Double[]>, Stream<Double>> extractMetric(String metric, 
StatusHistory statusHistory);
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java
index a1e2729..9dcc45b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java
@@ -19,14 +19,24 @@ package org.apache.nifi.controller.status.analytics.models;
 import java.util.Map;
 import java.util.stream.Stream;
 
-public abstract class BivariateStatusAnalyticsModel implements 
VariateStatusAnalyticsModel {
+import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
 
+public abstract class BivariateStatusAnalyticsModel implements 
StatusAnalyticsModel {
+
+    @Override
     public abstract void learn(Stream<Double[]> features, Stream<Double> 
labels);
 
-    public  Double predict(Double[] feature){
+    @Override
+    public Double predict(Double[] feature){
         return predictY(feature[0]);
     }
 
+    @Override
+    public Double predictVariable(Integer predictVariableIndex, Map<Integer, 
Double> knownVariablesWithIndex, Double label) {
+        return predictY(label);
+    }
+
+    @Override
     public Boolean supportsOnlineLearning() {
         return false;
     }
@@ -35,8 +45,7 @@ public abstract class BivariateStatusAnalyticsModel 
implements VariateStatusAnal
 
     public abstract Double predictY(Double x);
 
-    public abstract Double getRSquared();
-
+    @Override
     public abstract Map<String,Double> getScores();
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java
deleted file mode 100644
index f4a98ab..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.status.analytics.models;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import org.apache.nifi.util.Tuple;
-
-public abstract class MultivariateStatusAnalyticsModel implements 
VariateStatusAnalyticsModel {
-
-    public abstract void learn(Stream<Double[]> features, Stream<Double> 
labels);
-
-    public abstract Double predict(Double[] feature);
-
-    @Override
-    public Boolean supportsOnlineLearning() {
-        return false;
-    }
-
-    public abstract Double predictVariable(Integer variableIndex, 
List<Tuple<Integer,Double>> predictorVariablesWithIndex, Double label);
-
-    public abstract Double getRSquared();
-
-    public abstract Map<String,Double> getScores();
-
-    @Override
-    public void clear() {
-
-    }
-
-}
-
-
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquares.java
similarity index 78%
rename from 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java
rename to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquares.java
index 4340a8c..74aa8f8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquares.java
@@ -17,24 +17,23 @@
 package org.apache.nifi.controller.status.analytics.models;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
-import org.apache.nifi.util.Tuple;
+import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class OrdinaryLeastSquaresMSAM extends MultivariateStatusAnalyticsModel 
{
+public class OrdinaryLeastSquares implements StatusAnalyticsModel {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(OrdinaryLeastSquaresMSAM.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(OrdinaryLeastSquares.class);
     private OLSMultipleLinearRegression olsModel;
     private double[] coefficients;
 
-    public OrdinaryLeastSquaresMSAM() {
+    public OrdinaryLeastSquares() {
         this.olsModel = new OLSMultipleLinearRegression();
     }
 
@@ -63,13 +62,14 @@ public class OrdinaryLeastSquaresMSAM extends 
MultivariateStatusAnalyticsModel {
     }
 
     @Override
-    public Double predictVariable(Integer variableIndex, List<Tuple<Integer, 
Double>> predictorVariablesWithIndex, Double label) {
+    public Double predictVariable(Integer predictVariableIndex, Map<Integer, 
Double> knownVariablesWithIndex, Double label) {
         if (coefficients != null) {
             final double intercept = olsModel.isNoIntercept() ? 0 : 
coefficients[0];
-            final double predictorCoeff = coefficients[variableIndex + 1];
+            final double predictorCoeff = coefficients[predictVariableIndex + 
1];
             double sumX = 0;
-            if (predictorVariablesWithIndex.size() > 0) {
-                sumX = predictorVariablesWithIndex.stream().map(featureTuple 
-> coefficients[olsModel.isNoIntercept() ? featureTuple.getKey() : 
featureTuple.getKey() + 1] * featureTuple.getValue())
+            if (knownVariablesWithIndex.size() > 0) {
+                sumX = 
knownVariablesWithIndex.entrySet().stream().map(featureTuple -> 
coefficients[olsModel.isNoIntercept()
+                                                            ? 
featureTuple.getKey() : featureTuple.getKey() + 1] * featureTuple.getValue())
                                                            
.collect(Collectors.summingDouble(Double::doubleValue));
             }
             return (label - intercept - sumX) / predictorCoeff;
@@ -91,13 +91,12 @@ public class OrdinaryLeastSquaresMSAM extends 
MultivariateStatusAnalyticsModel {
     }
 
     @Override
-    public Double getRSquared() {
-        if (coefficients != null) {
-            return olsModel.calculateRSquared();
-        } else {
-            return null;
-        }
+    public Boolean supportsOnlineLearning() {
+        return false;
     }
 
+    @Override
+    public void clear() {
 
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegression.java
similarity index 80%
rename from 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java
rename to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegression.java
index 39ffce3..a6f1455 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegression.java
@@ -23,21 +23,24 @@ import java.util.stream.Stream;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.math3.stat.regression.RegressionResults;
-import org.apache.commons.math3.stat.regression.SimpleRegression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
+public class SimpleRegression extends BivariateStatusAnalyticsModel {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleRegressionBSAM.class);
-    private final SimpleRegression regression;
-    private final Boolean clearObservationsOnLearn;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleRegression.class);
+    private final org.apache.commons.math3.stat.regression.SimpleRegression 
regression;
+    private final Boolean supportOnlineLearning;
     private RegressionResults results;
 
-    public SimpleRegressionBSAM(Boolean clearObservationsOnLearn) {
+    public SimpleRegression() {
+        this.regression = new 
org.apache.commons.math3.stat.regression.SimpleRegression();
+        this.supportOnlineLearning = true;
+    }
 
-        this.regression = new SimpleRegression();
-        this.clearObservationsOnLearn = clearObservationsOnLearn;
+    public SimpleRegression(Boolean supportOnlineLearning) {
+        this.regression = new 
org.apache.commons.math3.stat.regression.SimpleRegression();
+        this.supportOnlineLearning = supportOnlineLearning;
     }
 
     @Override
@@ -45,7 +48,7 @@ public class SimpleRegressionBSAM extends 
BivariateStatusAnalyticsModel {
         double[] labelArray = 
ArrayUtils.toPrimitive(labels.toArray(Double[]::new));
         double[][] featuresMatrix = features.map(feature -> 
ArrayUtils.toPrimitive(feature)).toArray(double[][]::new);
 
-        if (clearObservationsOnLearn) {
+        if (!supportOnlineLearning) {
             regression.clear();
         }
 
@@ -67,15 +70,6 @@ public class SimpleRegressionBSAM extends 
BivariateStatusAnalyticsModel {
     }
 
     @Override
-    public Double getRSquared() {
-        if (results != null) {
-            return results.getRSquared();
-        } else {
-            return null;
-        }
-    }
-
-    @Override
     public Map<String, Double> getScores() {
         if(results == null){
             return null;
@@ -97,7 +91,7 @@ public class SimpleRegressionBSAM extends 
BivariateStatusAnalyticsModel {
 
     @Override
     public Boolean supportsOnlineLearning() {
-        return true;
+        return supportOnlineLearning;
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.status.analytics.StatusAnalyticsModel
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.status.analytics.StatusAnalyticsModel
new file mode 100644
index 0000000..f4e70e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.status.analytics.StatusAnalyticsModel
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares
+org.apache.nifi.controller.status.analytics.models.SimpleRegression
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
index 2c86bfc..57b300f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
@@ -16,25 +16,32 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.Map;
+
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.util.Tuple;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
 public class TestCachingConnectionStatusAnalyticsEngine extends 
TestStatusAnalyticsEngine {
 
     @Override
     public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager 
flowManager, FlowFileEventRepository flowFileEventRepository,
-                                                          
ComponentStatusRepository componentStatusRepository, long 
predictIntervalMillis) {
-        return new CachingConnectionStatusAnalyticsEngine(flowManager, 
componentStatusRepository, flowFileEventRepository, predictIntervalMillis);
+                                                          
ComponentStatusRepository componentStatusRepository,
+                                                          Map<String, 
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
+                                                          long 
predictIntervalMillis, String scoreName, double scoreThreshold) {
+
+        return new CachingConnectionStatusAnalyticsEngine(flowManager, 
componentStatusRepository, flowFileEventRepository, modelMap, 
predictIntervalMillis, scoreName, scoreThreshold);
     }
 
     @Test
     public void testCachedStatusAnalytics() {
-        StatusAnalyticsEngine statusAnalyticsEngine = new 
CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, 
flowFileEventRepository, DEFAULT_PREDICT_INTERVAL_MILLIS);
+        StatusAnalyticsEngine statusAnalyticsEngine = new 
CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, 
flowFileEventRepository, modelMap,
+                                                                               
                     DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, 
DEFAULT_SCORE_THRESHOLD);
         StatusAnalytics statusAnalyticsA = 
statusAnalyticsEngine.getStatusAnalytics("A");
         StatusAnalytics statusAnalyticsB = 
statusAnalyticsEngine.getStatusAnalytics("B");
         StatusAnalytics statusAnalyticsTest = 
statusAnalyticsEngine.getStatusAnalytics("A");
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
index b013ed4..ff92215 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
@@ -25,11 +25,15 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -43,6 +47,9 @@ import 
org.apache.nifi.controller.status.history.StandardStatusSnapshot;
 import org.apache.nifi.controller.status.history.StatusHistory;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.util.NiFiProperties;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -54,9 +61,19 @@ public class TestConnectionStatusAnalytics {
 
     protected ConnectionStatusAnalytics getConnectionStatusAnalytics(Long 
queuedBytes, Long queuedCount, String backPressureDataSizeThreshhold,
                                                                      Long 
backPressureObjectThreshold, Boolean isConstantStatus) {
+
         ComponentStatusRepository statusRepository = 
Mockito.mock(ComponentStatusRepository.class);
         FlowManager flowManager;
         flowManager = Mockito.mock(FlowManager.class);
+        final Map<String, String> otherProps = new HashMap<>();
+        final String propsFile = "src/test/resources/conf/nifi.properties";
+        NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties(propsFile, otherProps);
+
+        // use the system bundle
+        Bundle systemBundle = SystemBundle.create(nifiProperties);
+        StandardExtensionDiscoveringManager extensionManager = new 
StandardExtensionDiscoveringManager();
+        extensionManager.discoverExtensions(systemBundle, 
Collections.emptySet());
+
         final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
         final StatusHistory statusHistory = Mockito.mock(StatusHistory.class);
         final Connection connection = Mockito.mock(Connection.class);
@@ -64,6 +81,8 @@ public class TestConnectionStatusAnalytics {
         final FlowFileEventRepository flowFileEventRepository = 
Mockito.mock(FlowFileEventRepository.class);
         final RepositoryStatusReport repositoryStatusReport = 
Mockito.mock(RepositoryStatusReport.class);
         final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class);
+
+
         final List<Connection> connections = new ArrayList<>();
         final String connectionIdentifier = "1";
         connections.add(connection);
@@ -103,8 +122,10 @@ public class TestConnectionStatusAnalytics {
         
when(flowFileEventRepository.reportTransferEvents(anyLong())).thenReturn(repositoryStatusReport);
         
when(repositoryStatusReport.getReportEntry(anyString())).thenReturn(flowFileEvent);
         when(statusRepository.getConnectionStatusHistory(anyString(), any(), 
any(), anyInt())).thenReturn(statusHistory);
-        ConnectionStatusAnalytics connectionStatusAnalytics = new 
ConnectionStatusAnalytics(statusRepository, 
flowManager,flowFileEventRepository, connectionIdentifier, false);
-        connectionStatusAnalytics.init();
+
+        ConnectionStatusAnalytics connectionStatusAnalytics = new 
ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository,
+                
StatusAnalyticsModelMapFactory.getConnectionStatusModelMap(extensionManager,nifiProperties),
 connectionIdentifier, false);
+        connectionStatusAnalytics.refresh();
         return connectionStatusAnalytics;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
index 172c3b5..0b5612d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
@@ -16,16 +16,20 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.Map;
+
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.util.Tuple;
 
 public class TestConnectionStatusAnalyticsEngine extends 
TestStatusAnalyticsEngine {
 
     @Override
     public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager 
flowManager, FlowFileEventRepository flowFileEventRepository,
-                                                          
ComponentStatusRepository statusRepository, long predictIntervalMillis) {
-        return new ConnectionStatusAnalyticsEngine(flowManager, 
statusRepository, flowFileEventRepository, DEFAULT_PREDICT_INTERVAL_MILLIS);
+                                                          
ComponentStatusRepository statusRepository, Map<String, 
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
+                                                            long 
predictIntervalMillis, String scoreName, double scoreThreshold) {
+        return new ConnectionStatusAnalyticsEngine(flowManager, 
statusRepository, flowFileEventRepository,modelMap, 
DEFAULT_PREDICT_INTERVAL_MILLIS, scoreName, scoreThreshold);
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
index f666fdf..fce167d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
@@ -23,13 +23,21 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.StatusHistory;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.Tuple;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -37,16 +45,31 @@ import org.mockito.Mockito;
 public abstract class TestStatusAnalyticsEngine {
 
     static final long DEFAULT_PREDICT_INTERVAL_MILLIS = 3L * 60 * 1000;
+    static final String DEFAULT_SCORE_NAME = "rSquared";
+    static final double DEFAULT_SCORE_THRESHOLD = .9;
 
     protected ComponentStatusRepository statusRepository;
     protected FlowManager flowManager;
     protected FlowFileEventRepository flowFileEventRepository;
+    protected Map<String, Tuple<StatusAnalyticsModel, 
StatusMetricExtractFunction>> modelMap;
 
     @Before
     public void setup() {
 
         statusRepository = Mockito.mock(ComponentStatusRepository.class);
         flowManager = Mockito.mock(FlowManager.class);
+
+        final Map<String, String> otherProps = new HashMap<>();
+        final String propsFile = "src/test/resources/conf/nifi.properties";
+        NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties(propsFile, otherProps);
+
+        // use the system bundle
+        Bundle systemBundle = SystemBundle.create(nifiProperties);
+        ExtensionManager extensionManager = new 
StandardExtensionDiscoveringManager();
+        
((StandardExtensionDiscoveringManager)extensionManager).discoverExtensions(systemBundle,
 Collections.emptySet());
+
+        modelMap = 
StatusAnalyticsModelMapFactory.getConnectionStatusModelMap(extensionManager,nifiProperties);
+
         ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
         StatusHistory statusHistory = Mockito.mock(StatusHistory.class);
         StatusSnapshot statusSnapshot = Mockito.mock(StatusSnapshot.class);
@@ -57,12 +80,14 @@ public abstract class TestStatusAnalyticsEngine {
 
     @Test
     public void testGetStatusAnalytics() {
-        StatusAnalyticsEngine statusAnalyticsEngine = 
getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, 
DEFAULT_PREDICT_INTERVAL_MILLIS);
+        StatusAnalyticsEngine statusAnalyticsEngine = 
getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, 
modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS,
+                                                                               
 DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
         StatusAnalytics statusAnalytics = 
statusAnalyticsEngine.getStatusAnalytics("1");
         assertNotNull(statusAnalytics);
     }
 
     public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager 
flowManager, FlowFileEventRepository flowFileEventRepository,
-                                                                   
ComponentStatusRepository componentStatusRepository, long 
predictIntervalMillis);
+                                                                   
ComponentStatusRepository componentStatusRepository, Map<String, 
Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>>  modelMap,
+                                                                    long 
predictIntervalMillis, String scoreName, double scoreThreshold);
 
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsModelMapFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsModelMapFactory.java
new file mode 100644
index 0000000..98b253a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsModelMapFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStatusAnalyticsModelMapFactory {
+
+    protected NiFiProperties nifiProperties;
+    protected ExtensionManager extensionManager;
+
+    @Before
+    public void setup() {
+        final Map<String, String> otherProps = new HashMap<>();
+        final String propsFile = "src/test/resources/conf/nifi.properties";
+        nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, 
otherProps);
+
+        // use the system bundle
+        Bundle systemBundle = SystemBundle.create(nifiProperties);
+        extensionManager = new StandardExtensionDiscoveringManager();
+        ((StandardExtensionDiscoveringManager) 
extensionManager).discoverExtensions(systemBundle, Collections.emptySet());
+    }
+
+    @Test
+    public void getConnectionStatusModelMap() {
+        Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> 
modelMap = StatusAnalyticsModelMapFactory
+                .getConnectionStatusModelMap(extensionManager, nifiProperties);
+
+        assertNotNull(modelMap.get("queuedCount"));
+        assertNotNull(modelMap.get("queuedBytes"));
+        StatusAnalyticsModel countModel = modelMap.get("queuedCount").getKey();
+        StatusAnalyticsModel bytesModel = modelMap.get("queuedBytes").getKey();
+        assertNotNull(countModel);
+        assertNotNull(bytesModel);
+        
assertEquals(countModel.getClass().getName(),"org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares");
+        
assertEquals(bytesModel.getClass().getName(),"org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares");
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqaures.java
similarity index 80%
rename from 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java
rename to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqaures.java
index d4775c2..b51b2fd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqaures.java
@@ -18,18 +18,16 @@ package org.apache.nifi.controller.status.analytics.models;
 
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.commons.math3.linear.SingularMatrixException;
-import org.apache.nifi.util.Tuple;
 import org.junit.Test;
 
-public class TestOrdinaryLeastSqauresMSAM {
+public class TestOrdinaryLeastSqaures {
 
 
     @Test
@@ -48,7 +46,7 @@ public class TestOrdinaryLeastSqauresMSAM {
         Double[][] features = {feature0, feature1,feature2,feature3};
         Double[] labels = {queueCount,queueCount,queueCount, queueCount};
 
-        OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM();
+        OrdinaryLeastSquares model = new OrdinaryLeastSquares();
         boolean exOccurred = false;
         try {
             model.learn(Stream.of(features), Stream.of(labels));
@@ -75,15 +73,14 @@ public class TestOrdinaryLeastSqauresMSAM {
         Double[][] features = {feature0, feature1,feature2,feature3};
         Double[] labels = {queueCount,queueCount + 50, queueCount - 50, 
queueCount - 100};
 
-        OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM();
+        OrdinaryLeastSquares model = new OrdinaryLeastSquares();
 
         model.learn(Stream.of(features), Stream.of(labels));
 
-        Tuple<Integer,Double> ratioPredictor = new Tuple<>(1,200/800.0);
-        List<Tuple<Integer,Double>> predictorVars = new ArrayList<>();
-        predictorVars.add(ratioPredictor);
+        Map<Integer,Double> predictorVars = new HashMap<>();
+        predictorVars.put(1,200/800.0);
         Double target = model.predictVariable(0,predictorVars, 750.0);
-        Double rSquared = model.getRSquared();
+        Double rSquared = model.getScores().get("rSquared");
         assert(rSquared > .90);
         Date targetDate = new Date(target.longValue());
         Date testDate = new Date(timestamp.longValue());
@@ -108,13 +105,13 @@ public class TestOrdinaryLeastSqauresMSAM {
         Double[] labels = {queueCount,queueCount + 50, queueCount - 50, 
queueCount - 100};
 
 
-        OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM();
+        OrdinaryLeastSquares model = new OrdinaryLeastSquares();
 
         Double[] predictor = {timestamp + 5000, outputCount/inputCount};
 
         model.learn(Stream.of(features), Stream.of(labels));
         Double target = model.predict(predictor);
-        Double rSquared = model.getRSquared();
+        Double rSquared = model.getScores().get("rSquared");
         assert(rSquared > .90);
         assert(target >= 950);
 
@@ -136,17 +133,17 @@ public class TestOrdinaryLeastSqauresMSAM {
         Double[][] features = {feature0, feature1,feature2,feature3};
         Double[] labels = {queueCount,queueCount + 50, queueCount - 50, 
queueCount - 100};
 
-        OrdinaryLeastSquaresMSAM ordinaryLeastSquaresMSAM = new 
OrdinaryLeastSquaresMSAM();
-        SimpleRegressionBSAM simpleRegressionBSAM = new 
SimpleRegressionBSAM(false);
+        OrdinaryLeastSquares ordinaryLeastSquares = new OrdinaryLeastSquares();
+        SimpleRegression simpleRegression = new SimpleRegression(false);
 
-        ordinaryLeastSquaresMSAM.learn(Stream.of(features), Stream.of(labels));
-        simpleRegressionBSAM.learn(Stream.of(features), Stream.of(labels));
-        double olsR2 = ordinaryLeastSquaresMSAM.getRSquared();
-        double srR2 = simpleRegressionBSAM.getRSquared();
+        ordinaryLeastSquares.learn(Stream.of(features), Stream.of(labels));
+        simpleRegression.learn(Stream.of(features), Stream.of(labels));
+        double olsR2 = ordinaryLeastSquares.getScores().get("rSquared");
+        double srR2 = simpleRegression.getScores().get("rSquared");
         assert(!Double.isNaN(olsR2));
         assert(!Double.isNaN(srR2));
-        Map<String,Double> olsScores = ordinaryLeastSquaresMSAM.getScores();
-        Map<String,Double> srScores = simpleRegressionBSAM.getScores();
+        Map<String,Double> olsScores = ordinaryLeastSquares.getScores();
+        Map<String,Double> srScores = simpleRegression.getScores();
         System.out.print(olsScores.toString());
         System.out.print(srScores.toString());
         assert(olsR2 > srR2);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegression.java
similarity index 91%
rename from 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java
rename to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegression.java
index 71da432..b0ecc08 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegression.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertNotNull;
 import java.util.stream.Stream;
 import org.junit.Test;
 
-public class TestSimpleRegressionBSAM {
+public class TestSimpleRegression {
 
     @Test
     public void testConstantPrediction(){
@@ -37,7 +37,7 @@ public class TestSimpleRegressionBSAM {
         Double[][] features = {feature0, feature1,feature2,feature3};
         Double[] labels = {queueCount,queueCount,queueCount, queueCount};
 
-        SimpleRegressionBSAM model = new SimpleRegressionBSAM(false);
+        SimpleRegression model = new SimpleRegression(false);
 
         model.learn(Stream.of(features), Stream.of(labels));
 
@@ -62,7 +62,7 @@ public class TestSimpleRegressionBSAM {
         Double[][] features = {feature0, feature1,feature2,feature3};
         Double[] labels = {queueCount,queueCount + 50, queueCount - 50, 
queueCount - 100};
 
-        SimpleRegressionBSAM model = new SimpleRegressionBSAM(false);
+        SimpleRegression model = new SimpleRegression(false);
 
         model.learn(Stream.of(features), Stream.of(labels));
 
@@ -87,13 +87,13 @@ public class TestSimpleRegressionBSAM {
         Double[][] features = {feature0, feature1,feature2,feature3};
         Double[] labels = {queueCount,queueCount + 50, queueCount - 50, 
queueCount - 100};
 
-        SimpleRegressionBSAM model = new SimpleRegressionBSAM(false);
+        SimpleRegression model = new SimpleRegression(false);
 
         Double[] predictor = {timestamp + 5000};
 
         model.learn(Stream.of(features), Stream.of(labels));
         Double target = model.predict(predictor);
-        Double rSquared = model.getRSquared();
+        Double rSquared = model.getScores().get("rSquared");
         Double minCount = -1265.0;
         Double maxCount = 3235.0;
         assert(rSquared > .60);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties
index 3d9df16..9cfc3fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties
@@ -121,4 +121,7 @@ nifi.cluster.manager.protocol.threads=10
 nifi.cluster.manager.safemode.duration=0 sec
 
 # analytics properties #
-nifi.analytics.predict.interval=3 mins
\ No newline at end of file
+nifi.analytics.predict.interval=3 mins
+nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares
+nifi.analytics.connection.model.score.name=rSquared
+nifi.analytics.connection.model.score.threshold=.9
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
index a24d29c..f037569 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
@@ -16,6 +16,22 @@
  */
 package org.apache.nifi.nar;
 
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.authentication.LoginIdentityProvider;
 import org.apache.nifi.authorization.AccessPolicyProvider;
@@ -30,6 +46,7 @@ import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.init.ConfigurableComponentInitializer;
@@ -42,22 +59,6 @@ import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
 /**
  * Scans through the classpath to load all FlowFileProcessors, 
FlowFileComparators, and ReportingTasks using the service provider API and 
running through all classloaders (root, NARs).
  *
@@ -95,6 +96,7 @@ public class StandardExtensionDiscoveringManager implements 
ExtensionDiscovering
         definitionMap.put(FlowFileSwapManager.class, new HashSet<>());
         definitionMap.put(ContentRepository.class, new HashSet<>());
         definitionMap.put(StateProvider.class, new HashSet<>());
+        definitionMap.put(StatusAnalyticsModel.class, new HashSet<>());
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 15ac294..fbed8f9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -211,6 +211,9 @@
 
         <!-- nifi.properties: analytics properties -->
         <nifi.analytics.predict.interval>3 
mins</nifi.analytics.predict.interval>
+        
<nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation>
+        
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
+        
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
     </properties>
     <build>
         <plugins>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index e6140fa..3e0c324 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -258,4 +258,7 @@ 
nifi.kerberos.spnego.authentication.expiration=${nifi.kerberos.spnego.authentica
 nifi.variable.registry.properties=
 
 # analytics properties #
-nifi.analytics.predict.interval=${nifi.analytics.predict.interval}
\ No newline at end of file
+nifi.analytics.predict.interval=${nifi.analytics.predict.interval}
+nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation}
+nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
+nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}

Reply via email to