This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 8810f9c7b96a9bd3833eda703085e7938ff0a312 Author: Yolanda Davis <[email protected]> AuthorDate: Tue Aug 20 19:59:58 2019 -0400 NIFI-6566 - Refactor to decouple model instance from status analytics object. Also allow configurable model from nifi.properties NIFI-6566 - changes to allow scoring configurations for model in nifi.properties NIFI-6566 - added default implementation value to NiFiProperties NIFI-6566 - correction to default variable name in NiFiProperties, removed unnecessary init method from ConnectionStatusAnalytics Signed-off-by: Matthew Burgess <[email protected]> This closes #3663 --- .../status/analytics/StatusAnalyticsModel.java | 4 + .../java/org/apache/nifi/util/NiFiProperties.java | 7 +- .../resources/NiFiProperties/conf/nifi.properties | 3 +- .../src/main/asciidoc/administration-guide.adoc | 4 + .../org/apache/nifi/controller/FlowController.java | 27 ++- .../CachingConnectionStatusAnalyticsEngine.java | 27 ++- .../analytics/ConnectionStatusAnalytics.java | 185 +++++++++------------ .../analytics/ConnectionStatusAnalyticsEngine.java | 26 ++- .../analytics/StatusAnalyticsModelMapFactory.java | 106 ++++++++++++ ...Model.java => StatusMetricExtractFunction.java} | 20 +-- .../models/BivariateStatusAnalyticsModel.java | 17 +- .../models/MultivariateStatusAnalyticsModel.java | 50 ------ ...tSquaresMSAM.java => OrdinaryLeastSquares.java} | 29 ++-- ...leRegressionBSAM.java => SimpleRegression.java} | 32 ++-- ...ontroller.status.analytics.StatusAnalyticsModel | 16 ++ ...TestCachingConnectionStatusAnalyticsEngine.java | 19 ++- .../analytics/TestConnectionStatusAnalytics.java | 25 ++- .../TestConnectionStatusAnalyticsEngine.java | 8 +- .../analytics/TestStatusAnalyticsEngine.java | 29 +++- .../TestStatusAnalyticsModelMapFactory.java | 66 ++++++++ ...uresMSAM.java => TestOrdinaryLeastSqaures.java} | 37 ++--- ...gressionBSAM.java => TestSimpleRegression.java} | 10 +- .../src/test/resources/conf/nifi.properties | 5 +- .../nar/StandardExtensionDiscoveringManager.java | 34 ++-- .../nifi-framework/nifi-resources/pom.xml | 3 + .../src/main/resources/conf/nifi.properties | 5 +- 26 files changed, 497 insertions(+), 297 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java index 4869a0f..8d601e6 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java @@ -16,12 +16,16 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.Map; import java.util.stream.Stream; public interface StatusAnalyticsModel { void learn(Stream<Double[]> features, Stream<Double> labels); Double predict(Double[] feature); + Double predictVariable(Integer predictVariableIndex, Map<Integer,Double> knownVariablesWithIndex, Double label); Boolean supportsOnlineLearning(); + Map<String,Double> getScores(); + void clear(); } diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index aa00793..32e273e 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -240,6 +240,9 @@ public abstract class NiFiProperties { // analytics properties public static final String ANALYTICS_PREDICTION_INTERVAL = "nifi.analytics.predict.interval"; + public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "nifi.analytics.connection.model.implementation"; + public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= "nifi.analytics.connection.model.score.name"; + public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold"; // defaults public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; @@ -313,7 +316,9 @@ public abstract class NiFiProperties { // analytics defaults public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3 mins"; - + public final static String DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares"; + public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared"; + public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90; /** * Retrieves the property value for the given property key. diff --git a/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties b/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties index 9ab3d99..d3bd5d7 100644 --- a/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties +++ b/nifi-commons/nifi-properties/src/test/resources/NiFiProperties/conf/nifi.properties @@ -122,4 +122,5 @@ nifi.cluster.manager.protocol.threads=10 nifi.cluster.manager.safemode.duration=0 sec # analytics properties # -nifi.analytics.predict.interval=3 mins \ No newline at end of file +nifi.analytics.predict.interval=3 mins +nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares \ No newline at end of file diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 6fccde9..0b2fd8c 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -3325,6 +3325,10 @@ These properties determine the behavior of the internal NiFi Analytics capabilit |==== |*Property*|*Description* |`nifi.analytics.predict.interval`|This indicates a time interval for which analytical predictions (queue saturation, e.g.) should be made. The default value is `3 mins`. +|`nifi.analytics.connection.model.implementation`|This is the implementation class for the status analytics model used to make connection predictions. The default value is `org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`. +|`nifi.analytics.connection.model.score.name`|This is the name of the scoring type that should be used to evaluate model. The default value is `rSquared`. +|`nifi.analytics.connection.model.score.threshold`|This is the threshold for the scoring value (where score should be above given threshold). The default value is `.9`. + |==== [[custom_properties]] diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 55ff0bb..eeb9f82 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -149,8 +149,11 @@ import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.state.server.ZooKeeperStateServer; -import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalyticsEngine; +import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsModelMapFactory; +import org.apache.nifi.controller.status.analytics.StatusMetricExtractFunction; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.GarbageCollectionHistory; import org.apache.nifi.controller.status.history.GarbageCollectionStatus; @@ -210,6 +213,7 @@ import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.util.Tuple; import org.apache.nifi.util.concurrency.TimedLock; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; @@ -602,8 +606,25 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS); } + // Determine score name to use for evaluating model performance + String modelScoreName = nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME); + + // Determine score threshold to use when evaluating acceptable model + Double modelScoreThreshold; + try { + modelScoreThreshold = Double.valueOf(nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL)); + }catch (final Exception e) { + modelScoreThreshold = Double.valueOf(NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD); + } + componentStatusRepository = createComponentStatusRepository(); - analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, predictionIntervalMillis); + + final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = StatusAnalyticsModelMapFactory + .getConnectionStatusModelMap(extensionManager, nifiProperties); + + analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, + predictionIntervalMillis, modelScoreName, modelScoreThreshold); + eventAccess = new StandardEventAccess(this, flowFileEventRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @@ -1042,8 +1063,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } - - public KerberosConfig createKerberosConfig(final NiFiProperties nifiProperties) { final String principal = nifiProperties.getKerberosServicePrincipal(); final String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java index 95f655e..0cda68c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java @@ -16,49 +16,44 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { - private final ComponentStatusRepository statusRepository; - private final FlowManager flowManager; - private final FlowFileEventRepository flowFileEventRepository; - private volatile Cache<String, ConnectionStatusAnalytics> cache; - private final long predictionIntervalMillis; +public class CachingConnectionStatusAnalyticsEngine extends ConnectionStatusAnalyticsEngine{ + private volatile Cache<String, StatusAnalytics> cache; private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class); public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, - FlowFileEventRepository flowFileEventRepository, long predictionIntervalMillis) { - this.flowManager = flowManager; - this.statusRepository = statusRepository; - this.flowFileEventRepository = flowFileEventRepository; + FlowFileEventRepository flowFileEventRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, + long predictionIntervalMillis, String scoreName, double scoreThreshold) { + + super(flowManager,statusRepository,flowFileEventRepository,modelMap,predictionIntervalMillis,scoreName,scoreThreshold); this.cache = Caffeine.newBuilder() .expireAfterWrite(30, TimeUnit.MINUTES) .build(); - this.predictionIntervalMillis = predictionIntervalMillis; } @Override public StatusAnalytics getStatusAnalytics(String identifier) { - ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier); + StatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier); if (connectionStatusAnalytics == null) { LOG.debug("Creating new status analytics object for connection id: {}", identifier); - connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository, identifier, true); - connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis); - connectionStatusAnalytics.init(); + connectionStatusAnalytics = super.getStatusAnalytics(identifier); cache.put(identifier, connectionStatusAnalytics); } else { LOG.debug("Pulled existing analytics from cache for connection id: {}", identifier); - connectionStatusAnalytics.refresh(); + ((ConnectionStatusAnalytics)connectionStatusAnalytics).refresh(); } return connectionStatusAnalytics; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index 313e7ab..bc2270b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; -import java.util.Random; import java.util.stream.Stream; import org.apache.commons.lang3.ArrayUtils; @@ -31,18 +30,11 @@ import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.repository.FlowFileEvent; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.RepositoryStatusReport; -import org.apache.nifi.controller.status.analytics.models.MultivariateStatusAnalyticsModel; -import org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquaresMSAM; -import org.apache.nifi.controller.status.analytics.models.VariateStatusAnalyticsModel; import org.apache.nifi.controller.status.history.ComponentStatusRepository; -import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; import org.apache.nifi.controller.status.history.StatusHistory; -import org.apache.nifi.controller.status.history.StatusHistoryUtil; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.util.Tuple; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +43,7 @@ import com.google.common.primitives.Doubles; public class ConnectionStatusAnalytics implements StatusAnalytics { private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalytics.class); - private Map<String, Tuple<StatusAnalyticsModel, ExtractFunction>> modelMap; + private final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap; private QueryWindow queryWindow; private final ComponentStatusRepository componentStatusRepository; private final FlowFileEventRepository flowFileEventRepository; @@ -60,47 +52,32 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { private final Boolean supportOnlineLearning; private Boolean extendWindow = false; private long intervalMillis = 3L * 60 * 1000; // Default is 3 minutes - private static double SCORE_THRESHOLD = .90; + private String scoreName = "rSquared"; + private double scoreThreshold = .90; - public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, String connectionIdentifier, - Boolean supportOnlineLearning) { + public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, + Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, String connectionIdentifier, Boolean supportOnlineLearning) { this.componentStatusRepository = componentStatusRepository; this.flowManager = flowManager; this.flowFileEventRepository = flowFileEventRepository; + this.modelMap = modelMap; this.connectionIdentifier = connectionIdentifier; this.supportOnlineLearning = supportOnlineLearning; } - public void init() { - - LOG.debug("Initialize analytics connection id: {} ", connectionIdentifier); - - if (this.modelMap == null || this.modelMap.isEmpty()) { - Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new OrdinaryLeastSquaresMSAM(), extract); - Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new OrdinaryLeastSquaresMSAM(), extract); - this.modelMap = new HashMap<>(); - //TODO: Should change keys used here - this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction); - this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), byteModelFunction); - } - - refresh(); - } - public void refresh() { - if (this.queryWindow == null) { - //Set query window to fresh value - this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis()); - } else if (supportOnlineLearning) { + if (supportOnlineLearning && this.queryWindow != null) { //Obtain latest observations when available, extend window if needed to obtain minimum observations this.queryWindow = new QueryWindow(extendWindow ? queryWindow.getStartTimeMillis() : queryWindow.getEndTimeMillis(), System.currentTimeMillis()); - + } else { + this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis()); } + modelMap.forEach((metric, modelFunction) -> { StatusAnalyticsModel model = modelFunction.getKey(); - ExtractFunction extract = modelFunction.getValue(); + StatusMetricExtractFunction extract = modelFunction.getValue(); StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE); Tuple<Stream<Double[]>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory); Double[][] features = modelData.getKey().toArray(size -> new Double[size][1]); @@ -129,7 +106,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { */ public Long getTimeToBytesBackpressureMillis() { - final MultivariateStatusAnalyticsModel bytesModel = (MultivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey(); + final StatusAnalyticsModel bytesModel = modelMap.get("queuedBytes").getKey(); FlowFileEvent flowFileEvent = getStatusReport(); final Connection connection = getConnection(); @@ -139,12 +116,12 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold(); final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B); - if(validModel(bytesModel) && flowFileEvent != null) { - List<Tuple<Integer, Double>> predictFeatures = new ArrayList<>(); - Double inOutRatio = (flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn()); - predictFeatures.add(new Tuple<>(1, inOutRatio)); - return getTimePrediction(bytesModel.predictVariable(0, predictFeatures, backPressureBytes), System.currentTimeMillis()); - }else{ + if (validModel(bytesModel) && flowFileEvent != null) { + Map<Integer, Double> predictFeatures = new HashMap<>(); + Double inOutRatio = (flowFileEvent.getContentSizeOut() / (double) flowFileEvent.getContentSizeIn()); + predictFeatures.put(1, inOutRatio); + return convertTimePrediction(bytesModel.predictVariable(0, predictFeatures, backPressureBytes), System.currentTimeMillis()); + } else { return -1L; } } @@ -156,7 +133,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { */ public Long getTimeToCountBackpressureMillis() { - final MultivariateStatusAnalyticsModel countModel = (MultivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey(); + final StatusAnalyticsModel countModel = modelMap.get("queuedCount").getKey(); FlowFileEvent flowFileEvent = getStatusReport(); final Connection connection = getConnection(); @@ -166,12 +143,12 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold(); - if(validModel(countModel) && flowFileEvent != null) { - List<Tuple<Integer, Double>> predictFeatures = new ArrayList<>(); - Double inOutRatio = (flowFileEvent.getFlowFilesOut() / (double)flowFileEvent.getFlowFilesIn()); - predictFeatures.add(new Tuple<>(1, inOutRatio)); - return getTimePrediction(countModel.predictVariable(0, predictFeatures, backPressureCountThreshold), System.currentTimeMillis()); - }else{ + if (validModel(countModel) && flowFileEvent != null) { + Map<Integer, Double> predictFeatures = new HashMap<>(); + Double inOutRatio = (flowFileEvent.getFlowFilesOut() / (double) flowFileEvent.getFlowFilesIn()); + predictFeatures.put(1, inOutRatio); + return convertTimePrediction(countModel.predictVariable(0, predictFeatures, backPressureCountThreshold), System.currentTimeMillis()); + } else { return -1L; } } @@ -183,17 +160,17 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { */ public Long getNextIntervalBytes() { - final VariateStatusAnalyticsModel bytesModel = (VariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey(); + final StatusAnalyticsModel bytesModel = modelMap.get("queuedBytes").getKey(); FlowFileEvent flowFileEvent = getStatusReport(); - if(validModel(bytesModel) && flowFileEvent != null) { + if (validModel(bytesModel) && flowFileEvent != null) { List<Double> predictFeatures = new ArrayList<>(); - Long nextInterval = System.currentTimeMillis() + getIntervalTimeMillis(); - Double inOutRatio = flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn(); + Long nextInterval = System.currentTimeMillis() + getIntervalTimeMillis(); + Double inOutRatio = flowFileEvent.getContentSizeOut() / (double) flowFileEvent.getContentSizeIn(); predictFeatures.add(nextInterval.doubleValue()); predictFeatures.add(inOutRatio); - return (bytesModel.predict(predictFeatures.toArray(new Double[2]))).longValue(); - }else{ + return convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new Double[2]))); + } else { return -1L; } } @@ -205,17 +182,17 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { */ public Long getNextIntervalCount() { - final VariateStatusAnalyticsModel countModel = (VariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey(); + final StatusAnalyticsModel countModel = modelMap.get("queuedCount").getKey(); FlowFileEvent flowFileEvent = getStatusReport(); - if(validModel(countModel) && flowFileEvent != null) { + if (validModel(countModel) && flowFileEvent != null) { List<Double> predictFeatures = new ArrayList<>(); - Long nextInterval = System.currentTimeMillis() + getIntervalTimeMillis(); - Double inOutRatio = flowFileEvent.getFlowFilesOut()/ (double)flowFileEvent.getFlowFilesIn(); + Long nextInterval = System.currentTimeMillis() + getIntervalTimeMillis(); + Double inOutRatio = flowFileEvent.getFlowFilesOut() / (double) flowFileEvent.getFlowFilesIn(); predictFeatures.add(nextInterval.doubleValue()); predictFeatures.add(inOutRatio); - return (countModel.predict(predictFeatures.toArray(new Double[2]))).longValue(); - }else{ + return convertCountPrediction(countModel.predict(predictFeatures.toArray(new Double[2]))); + } else { return -1L; } @@ -263,6 +240,22 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { this.intervalMillis = intervalTimeMillis; } + public String getScoreName() { + return scoreName; + } + + public void setScoreName(String scoreName) { + this.scoreName = scoreName; + } + + public double getScoreThreshold() { + return scoreThreshold; + } + + public void setScoreThreshold(double scoreThreshold) { + this.scoreThreshold = scoreThreshold; + } + @Override public QueryWindow getQueryWindow() { return queryWindow; @@ -301,75 +294,47 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { return connection.orElse(null); } - private FlowFileEvent getStatusReport(){ - RepositoryStatusReport statusReport = flowFileEventRepository.reportTransferEvents(System.currentTimeMillis()); + private FlowFileEvent getStatusReport() { + RepositoryStatusReport statusReport = flowFileEventRepository.reportTransferEvents(System.currentTimeMillis()); return statusReport.getReportEntry(this.connectionIdentifier); } - private interface ExtractFunction { - Tuple<Stream<Double[]>, Stream<Double>> extractMetric(String metric, StatusHistory statusHistory); + private Long convertTimePrediction(Double prediction, Long timeMillis) { + if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < timeMillis) { + return -1L; + } else { + return Math.max(0, Math.round(prediction) - timeMillis); + } } - private Long getTimePrediction(Double prediction, Long timeMillis) { - - if (Double.isNaN(prediction) || Double.isInfinite(prediction)) { + private Long convertCountPrediction(Double prediction) { + if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < 0) { return -1L; - } else if (prediction < timeMillis) { - return 0L; } else { - return Math.max(0, Math.round(prediction) - timeMillis); + return Math.max(0, Math.round(prediction)); } } - private boolean validModel(VariateStatusAnalyticsModel model){ + private boolean validModel(StatusAnalyticsModel model) { - Double rSquared = model.getRSquared(); + Double score = getScore(model); - if (rSquared == null || (Doubles.isFinite(rSquared) && !Double.isNaN(rSquared) && rSquared < SCORE_THRESHOLD)) { - if(supportOnlineLearning && model.supportsOnlineLearning()){ + if (score == null || (Doubles.isFinite(score) && !Double.isNaN(score) && score < scoreThreshold)) { + if (supportOnlineLearning && model.supportsOnlineLearning()) { model.clear(); } return false; - }else { + } else { return true; } } - private final ExtractFunction extract = (metric, statusHistory) -> { - - List<Double> values = new ArrayList<>(); - List<Double[]> features = new ArrayList<>(); - Random rand = new Random(); - StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory); - - for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) { - List<Double> featureArray = new ArrayList<>(); - Long snapValue = snap.getStatusMetrics().get(metric); - long snapTime = snap.getTimestamp().getTime(); - - featureArray.add((double) snapTime); - Double randomError = + (rand.nextInt(1000) * .0000001); - if (metric.equals(ConnectionStatusDescriptor.QUEUED_COUNT.getField())) { - - Long inputCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.INPUT_COUNT.getField()); - Long outputCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.OUTPUT_COUNT.getField()); - Double inOutRatio = ((double)outputCount /(double)inputCount) + randomError; - featureArray.add(Double.isNaN(inOutRatio)? randomError : inOutRatio); - - } else { - Long inputBytes = snap.getStatusMetrics().get(ConnectionStatusDescriptor.INPUT_BYTES.getField()); - Long outputBytes = snap.getStatusMetrics().get(ConnectionStatusDescriptor.OUTPUT_BYTES.getField()); - Double inOutRatio = ((double)outputBytes/(double)inputBytes) + randomError; - featureArray.add(Double.isNaN(inOutRatio)? randomError : inOutRatio); - } - - values.add((double) snapValue); - features.add(featureArray.toArray(new Double[featureArray.size()])); - + private Double getScore(StatusAnalyticsModel model) { + if (model != null && model.getScores() != null) { + return model.getScores().get(scoreName); + } else { + return null; } - return new Tuple<Stream<Double[]>, Stream<Double>>(features.stream(), values.stream()); - - }; - + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java index a9ba4ea..de815fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java @@ -16,31 +16,43 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.Map; + import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine { private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class); - private final ComponentStatusRepository statusRepository; - private final FlowManager flowManager; - private final FlowFileEventRepository flowFileEventRepository; - private final long predictionIntervalMillis; + protected final ComponentStatusRepository statusRepository; + protected final FlowManager flowManager; + protected final FlowFileEventRepository flowFileEventRepository; + protected final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap; + protected final long predictionIntervalMillis; + protected final String scoreName; + protected final double scoreThreshold; - public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository, long predictionIntervalMillis) { + public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository, + Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis, String scoreName, double scoreThreshold) { this.flowManager = flowManager; this.statusRepository = statusRepository; this.flowFileEventRepository = flowFileEventRepository; this.predictionIntervalMillis = predictionIntervalMillis; + this.modelMap = modelMap; + this.scoreName = scoreName; + this.scoreThreshold = scoreThreshold; } @Override public StatusAnalytics getStatusAnalytics(String identifier) { - ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, identifier, false); + ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, modelMap, identifier, false); connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis); - connectionStatusAnalytics.init(); + connectionStatusAnalytics.setScoreName(scoreName); + connectionStatusAnalytics.setScoreThreshold(scoreThreshold); + connectionStatusAnalytics.refresh(); return connectionStatusAnalytics; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModelMapFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModelMapFactory.java new file mode 100644 index 0000000..b8e6775 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModelMapFactory.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.nifi.controller.status.history.StatusHistoryUtil; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarThreadContextClassLoader; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.Tuple; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; + +public class StatusAnalyticsModelMapFactory { + + private final static String QUEUED_COUNT_METRIC = "queuedCount"; + private final static String QUEUED_BYTES_METRIC = "queuedBytes"; + private final static String INPUT_COUNT_METRIC = "inputCount"; + private final static String INPUT_BYTES_METRIC = "inputBytes"; + private final static String OUTPUT_COUNT_METRIC = "outputCount"; + private final static String OUTPUT_BYTES_METRIC = "outputBytes"; + + + public static Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> getConnectionStatusModelMap(ExtensionManager extensionManager, NiFiProperties niFiProperties){ + Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = new HashMap<>(); + StatusMetricExtractFunction extract = getConnectionStatusExtractFunction(); + Tuple<StatusAnalyticsModel, StatusMetricExtractFunction> countModelFunction = new Tuple<>(createModelInstance(extensionManager, niFiProperties), extract); + Tuple<StatusAnalyticsModel, StatusMetricExtractFunction> byteModelFunction = new Tuple<>(createModelInstance(extensionManager, niFiProperties), extract); + modelMap.put(QUEUED_COUNT_METRIC, countModelFunction); + modelMap.put(QUEUED_BYTES_METRIC, byteModelFunction); + return modelMap; + } + + private static StatusAnalyticsModel createModelInstance(ExtensionManager extensionManager, NiFiProperties nifiProperties) { + final String implementationClassName = nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Analytics Model because the NiFi Properties is missing the following property: " + + NiFiProperties.ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION); + } + try { + return NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, StatusAnalyticsModel.class, nifiProperties); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static StatusMetricExtractFunction getConnectionStatusExtractFunction() { + + return (metric, statusHistory) -> { + + List<Double> values = new ArrayList<>(); + List<Double[]> features = new ArrayList<>(); + Random rand = new Random(); + StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory); + + for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) { + List<Double> featureArray = new ArrayList<>(); + Long snapValue = snap.getStatusMetrics().get(metric); + long snapTime = snap.getTimestamp().getTime(); + + featureArray.add((double) snapTime); + Double randomError = +(rand.nextInt(1000) * .0000001); + if (metric.equals(QUEUED_COUNT_METRIC)) { + + Long inputCount = snap.getStatusMetrics().get(INPUT_COUNT_METRIC); + Long outputCount = snap.getStatusMetrics().get(OUTPUT_COUNT_METRIC); + Double inOutRatio = ((double) outputCount / (double) inputCount) + randomError; + featureArray.add(Double.isNaN(inOutRatio) ? randomError : inOutRatio); + + } else { + Long inputBytes = snap.getStatusMetrics().get(INPUT_BYTES_METRIC); + Long outputBytes = snap.getStatusMetrics().get(OUTPUT_BYTES_METRIC); + Double inOutRatio = ((double) outputBytes / (double) inputBytes) + randomError; + featureArray.add(Double.isNaN(inOutRatio) ? randomError : inOutRatio); + } + + values.add((double) snapValue); + features.add(featureArray.toArray(new Double[featureArray.size()])); + + } + return new Tuple<>(features.stream(), values.stream()); + + }; + } + + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusMetricExtractFunction.java similarity index 64% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusMetricExtractFunction.java index 44ce6b5..38d2c86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusMetricExtractFunction.java @@ -14,24 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller.status.analytics.models; +package org.apache.nifi.controller.status.analytics; -import java.util.Map; import java.util.stream.Stream; -import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel; +import org.apache.nifi.controller.status.history.StatusHistory; +import org.apache.nifi.util.Tuple; -public interface VariateStatusAnalyticsModel extends StatusAnalyticsModel { +public interface StatusMetricExtractFunction { - void learn(Stream<Double[]> features, Stream<Double> labels); - - Double predict(Double[] feature); - - Boolean supportsOnlineLearning(); - - Double getRSquared(); - - Map<String,Double> getScores(); - - void clear(); + Tuple<Stream<Double[]>, Stream<Double>> extractMetric(String metric, StatusHistory statusHistory); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java index a1e2729..9dcc45b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java @@ -19,14 +19,24 @@ package org.apache.nifi.controller.status.analytics.models; import java.util.Map; import java.util.stream.Stream; -public abstract class BivariateStatusAnalyticsModel implements VariateStatusAnalyticsModel { +import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel; +public abstract class BivariateStatusAnalyticsModel implements StatusAnalyticsModel { + + @Override public abstract void learn(Stream<Double[]> features, Stream<Double> labels); - public Double predict(Double[] feature){ + @Override + public Double predict(Double[] feature){ return predictY(feature[0]); } + @Override + public Double predictVariable(Integer predictVariableIndex, Map<Integer, Double> knownVariablesWithIndex, Double label) { + return predictY(label); + } + + @Override public Boolean supportsOnlineLearning() { return false; } @@ -35,8 +45,7 @@ public abstract class BivariateStatusAnalyticsModel implements VariateStatusAnal public abstract Double predictY(Double x); - public abstract Double getRSquared(); - + @Override public abstract Map<String,Double> getScores(); @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java deleted file mode 100644 index f4a98ab..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.status.analytics.models; - -import java.util.List; -import java.util.Map; -import java.util.stream.Stream; - -import org.apache.nifi.util.Tuple; - -public abstract class MultivariateStatusAnalyticsModel implements VariateStatusAnalyticsModel { - - public abstract void learn(Stream<Double[]> features, Stream<Double> labels); - - public abstract Double predict(Double[] feature); - - @Override - public Boolean supportsOnlineLearning() { - return false; - } - - public abstract Double predictVariable(Integer variableIndex, List<Tuple<Integer,Double>> predictorVariablesWithIndex, Double label); - - public abstract Double getRSquared(); - - public abstract Map<String,Double> getScores(); - - @Override - public void clear() { - - } - -} - - diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquares.java similarity index 78% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquares.java index 4340a8c..74aa8f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquares.java @@ -17,24 +17,23 @@ package org.apache.nifi.controller.status.analytics.models; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression; -import org.apache.nifi.util.Tuple; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class OrdinaryLeastSquaresMSAM extends MultivariateStatusAnalyticsModel { +public class OrdinaryLeastSquares implements StatusAnalyticsModel { - private static final Logger LOG = LoggerFactory.getLogger(OrdinaryLeastSquaresMSAM.class); + private static final Logger LOG = LoggerFactory.getLogger(OrdinaryLeastSquares.class); private OLSMultipleLinearRegression olsModel; private double[] coefficients; - public OrdinaryLeastSquaresMSAM() { + public OrdinaryLeastSquares() { this.olsModel = new OLSMultipleLinearRegression(); } @@ -63,13 +62,14 @@ public class OrdinaryLeastSquaresMSAM extends MultivariateStatusAnalyticsModel { } @Override - public Double predictVariable(Integer variableIndex, List<Tuple<Integer, Double>> predictorVariablesWithIndex, Double label) { + public Double predictVariable(Integer predictVariableIndex, Map<Integer, Double> knownVariablesWithIndex, Double label) { if (coefficients != null) { final double intercept = olsModel.isNoIntercept() ? 0 : coefficients[0]; - final double predictorCoeff = coefficients[variableIndex + 1]; + final double predictorCoeff = coefficients[predictVariableIndex + 1]; double sumX = 0; - if (predictorVariablesWithIndex.size() > 0) { - sumX = predictorVariablesWithIndex.stream().map(featureTuple -> coefficients[olsModel.isNoIntercept() ? featureTuple.getKey() : featureTuple.getKey() + 1] * featureTuple.getValue()) + if (knownVariablesWithIndex.size() > 0) { + sumX = knownVariablesWithIndex.entrySet().stream().map(featureTuple -> coefficients[olsModel.isNoIntercept() + ? featureTuple.getKey() : featureTuple.getKey() + 1] * featureTuple.getValue()) .collect(Collectors.summingDouble(Double::doubleValue)); } return (label - intercept - sumX) / predictorCoeff; @@ -91,13 +91,12 @@ public class OrdinaryLeastSquaresMSAM extends MultivariateStatusAnalyticsModel { } @Override - public Double getRSquared() { - if (coefficients != null) { - return olsModel.calculateRSquared(); - } else { - return null; - } + public Boolean supportsOnlineLearning() { + return false; } + @Override + public void clear() { + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegression.java similarity index 80% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegression.java index 39ffce3..a6f1455 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegression.java @@ -23,21 +23,24 @@ import java.util.stream.Stream; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.math3.stat.regression.RegressionResults; -import org.apache.commons.math3.stat.regression.SimpleRegression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel { +public class SimpleRegression extends BivariateStatusAnalyticsModel { - private static final Logger LOG = LoggerFactory.getLogger(SimpleRegressionBSAM.class); - private final SimpleRegression regression; - private final Boolean clearObservationsOnLearn; + private static final Logger LOG = LoggerFactory.getLogger(SimpleRegression.class); + private final org.apache.commons.math3.stat.regression.SimpleRegression regression; + private final Boolean supportOnlineLearning; private RegressionResults results; - public SimpleRegressionBSAM(Boolean clearObservationsOnLearn) { + public SimpleRegression() { + this.regression = new org.apache.commons.math3.stat.regression.SimpleRegression(); + this.supportOnlineLearning = true; + } - this.regression = new SimpleRegression(); - this.clearObservationsOnLearn = clearObservationsOnLearn; + public SimpleRegression(Boolean supportOnlineLearning) { + this.regression = new org.apache.commons.math3.stat.regression.SimpleRegression(); + this.supportOnlineLearning = supportOnlineLearning; } @Override @@ -45,7 +48,7 @@ public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel { double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new)); double[][] featuresMatrix = features.map(feature -> ArrayUtils.toPrimitive(feature)).toArray(double[][]::new); - if (clearObservationsOnLearn) { + if (!supportOnlineLearning) { regression.clear(); } @@ -67,15 +70,6 @@ public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel { } @Override - public Double getRSquared() { - if (results != null) { - return results.getRSquared(); - } else { - return null; - } - } - - @Override public Map<String, Double> getScores() { if(results == null){ return null; @@ -97,7 +91,7 @@ public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel { @Override public Boolean supportsOnlineLearning() { - return true; + return supportOnlineLearning; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.status.analytics.StatusAnalyticsModel b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.status.analytics.StatusAnalyticsModel new file mode 100644 index 0000000..f4e70e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.controller.status.analytics.StatusAnalyticsModel @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares +org.apache.nifi.controller.status.analytics.models.SimpleRegression \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java index 2c86bfc..57b300f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java @@ -16,25 +16,32 @@ */ package org.apache.nifi.controller.status.analytics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.util.Map; + import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.util.Tuple; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - public class TestCachingConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine { @Override public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, - ComponentStatusRepository componentStatusRepository, long predictIntervalMillis) { - return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, predictIntervalMillis); + ComponentStatusRepository componentStatusRepository, + Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, + long predictIntervalMillis, String scoreName, double scoreThreshold) { + + return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, predictIntervalMillis, scoreName, scoreThreshold); } @Test public void testCachedStatusAnalytics() { - StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, DEFAULT_PREDICT_INTERVAL_MILLIS); + StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, modelMap, + DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD); StatusAnalytics statusAnalyticsA = statusAnalyticsEngine.getStatusAnalytics("A"); StatusAnalytics statusAnalyticsB = statusAnalyticsEngine.getStatusAnalytics("B"); StatusAnalytics statusAnalyticsTest = statusAnalyticsEngine.getStatusAnalytics("A"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java index b013ed4..ff92215 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java @@ -25,11 +25,15 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -43,6 +47,9 @@ import org.apache.nifi.controller.status.history.StandardStatusSnapshot; import org.apache.nifi.controller.status.history.StatusHistory; import org.apache.nifi.controller.status.history.StatusSnapshot; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.util.NiFiProperties; import org.junit.Test; import org.mockito.Mockito; @@ -54,9 +61,19 @@ public class TestConnectionStatusAnalytics { protected ConnectionStatusAnalytics getConnectionStatusAnalytics(Long queuedBytes, Long queuedCount, String backPressureDataSizeThreshhold, Long backPressureObjectThreshold, Boolean isConstantStatus) { + ComponentStatusRepository statusRepository = Mockito.mock(ComponentStatusRepository.class); FlowManager flowManager; flowManager = Mockito.mock(FlowManager.class); + final Map<String, String> otherProps = new HashMap<>(); + final String propsFile = "src/test/resources/conf/nifi.properties"; + NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, otherProps); + + // use the system bundle + Bundle systemBundle = SystemBundle.create(nifiProperties); + StandardExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager(); + extensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class); final StatusHistory statusHistory = Mockito.mock(StatusHistory.class); final Connection connection = Mockito.mock(Connection.class); @@ -64,6 +81,8 @@ public class TestConnectionStatusAnalytics { final FlowFileEventRepository flowFileEventRepository = Mockito.mock(FlowFileEventRepository.class); final RepositoryStatusReport repositoryStatusReport = Mockito.mock(RepositoryStatusReport.class); final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class); + + final List<Connection> connections = new ArrayList<>(); final String connectionIdentifier = "1"; connections.add(connection); @@ -103,8 +122,10 @@ public class TestConnectionStatusAnalytics { when(flowFileEventRepository.reportTransferEvents(anyLong())).thenReturn(repositoryStatusReport); when(repositoryStatusReport.getReportEntry(anyString())).thenReturn(flowFileEvent); when(statusRepository.getConnectionStatusHistory(anyString(), any(), any(), anyInt())).thenReturn(statusHistory); - ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository, connectionIdentifier, false); - connectionStatusAnalytics.init(); + + ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository, + StatusAnalyticsModelMapFactory.getConnectionStatusModelMap(extensionManager,nifiProperties), connectionIdentifier, false); + connectionStatusAnalytics.refresh(); return connectionStatusAnalytics; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java index 172c3b5..0b5612d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java @@ -16,16 +16,20 @@ */ package org.apache.nifi.controller.status.analytics; +import java.util.Map; + import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.util.Tuple; public class TestConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine { @Override public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, - ComponentStatusRepository statusRepository, long predictIntervalMillis) { - return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, DEFAULT_PREDICT_INTERVAL_MILLIS); + ComponentStatusRepository statusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, + long predictIntervalMillis, String scoreName, double scoreThreshold) { + return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS, scoreName, scoreThreshold); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java index f666fdf..fce167d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java @@ -23,13 +23,21 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.StatusHistory; import org.apache.nifi.controller.status.history.StatusSnapshot; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.Tuple; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -37,16 +45,31 @@ import org.mockito.Mockito; public abstract class TestStatusAnalyticsEngine { static final long DEFAULT_PREDICT_INTERVAL_MILLIS = 3L * 60 * 1000; + static final String DEFAULT_SCORE_NAME = "rSquared"; + static final double DEFAULT_SCORE_THRESHOLD = .9; protected ComponentStatusRepository statusRepository; protected FlowManager flowManager; protected FlowFileEventRepository flowFileEventRepository; + protected Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap; @Before public void setup() { statusRepository = Mockito.mock(ComponentStatusRepository.class); flowManager = Mockito.mock(FlowManager.class); + + final Map<String, String> otherProps = new HashMap<>(); + final String propsFile = "src/test/resources/conf/nifi.properties"; + NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, otherProps); + + // use the system bundle + Bundle systemBundle = SystemBundle.create(nifiProperties); + ExtensionManager extensionManager = new StandardExtensionDiscoveringManager(); + ((StandardExtensionDiscoveringManager)extensionManager).discoverExtensions(systemBundle, Collections.emptySet()); + + modelMap = StatusAnalyticsModelMapFactory.getConnectionStatusModelMap(extensionManager,nifiProperties); + ProcessGroup processGroup = Mockito.mock(ProcessGroup.class); StatusHistory statusHistory = Mockito.mock(StatusHistory.class); StatusSnapshot statusSnapshot = Mockito.mock(StatusSnapshot.class); @@ -57,12 +80,14 @@ public abstract class TestStatusAnalyticsEngine { @Test public void testGetStatusAnalytics() { - StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, DEFAULT_PREDICT_INTERVAL_MILLIS); + StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS, + DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD); StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics("1"); assertNotNull(statusAnalytics); } public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, - ComponentStatusRepository componentStatusRepository, long predictIntervalMillis); + ComponentStatusRepository componentStatusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, + long predictIntervalMillis, String scoreName, double scoreThreshold); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsModelMapFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsModelMapFactory.java new file mode 100644 index 0000000..98b253a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsModelMapFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.analytics; + +import static org.junit.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.Tuple; +import org.junit.Before; +import org.junit.Test; + +public class TestStatusAnalyticsModelMapFactory { + + protected NiFiProperties nifiProperties; + protected ExtensionManager extensionManager; + + @Before + public void setup() { + final Map<String, String> otherProps = new HashMap<>(); + final String propsFile = "src/test/resources/conf/nifi.properties"; + nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, otherProps); + + // use the system bundle + Bundle systemBundle = SystemBundle.create(nifiProperties); + extensionManager = new StandardExtensionDiscoveringManager(); + ((StandardExtensionDiscoveringManager) extensionManager).discoverExtensions(systemBundle, Collections.emptySet()); + } + + @Test + public void getConnectionStatusModelMap() { + Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap = StatusAnalyticsModelMapFactory + .getConnectionStatusModelMap(extensionManager, nifiProperties); + + assertNotNull(modelMap.get("queuedCount")); + assertNotNull(modelMap.get("queuedBytes")); + StatusAnalyticsModel countModel = modelMap.get("queuedCount").getKey(); + StatusAnalyticsModel bytesModel = modelMap.get("queuedBytes").getKey(); + assertNotNull(countModel); + assertNotNull(bytesModel); + assertEquals(countModel.getClass().getName(),"org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares"); + assertEquals(bytesModel.getClass().getName(),"org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares"); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqaures.java similarity index 80% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqaures.java index d4775c2..b51b2fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqaures.java @@ -18,18 +18,16 @@ package org.apache.nifi.controller.status.analytics.models; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; import java.util.Date; -import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.math3.linear.SingularMatrixException; -import org.apache.nifi.util.Tuple; import org.junit.Test; -public class TestOrdinaryLeastSqauresMSAM { +public class TestOrdinaryLeastSqaures { @Test @@ -48,7 +46,7 @@ public class TestOrdinaryLeastSqauresMSAM { Double[][] features = {feature0, feature1,feature2,feature3}; Double[] labels = {queueCount,queueCount,queueCount, queueCount}; - OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM(); + OrdinaryLeastSquares model = new OrdinaryLeastSquares(); boolean exOccurred = false; try { model.learn(Stream.of(features), Stream.of(labels)); @@ -75,15 +73,14 @@ public class TestOrdinaryLeastSqauresMSAM { Double[][] features = {feature0, feature1,feature2,feature3}; Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100}; - OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM(); + OrdinaryLeastSquares model = new OrdinaryLeastSquares(); model.learn(Stream.of(features), Stream.of(labels)); - Tuple<Integer,Double> ratioPredictor = new Tuple<>(1,200/800.0); - List<Tuple<Integer,Double>> predictorVars = new ArrayList<>(); - predictorVars.add(ratioPredictor); + Map<Integer,Double> predictorVars = new HashMap<>(); + predictorVars.put(1,200/800.0); Double target = model.predictVariable(0,predictorVars, 750.0); - Double rSquared = model.getRSquared(); + Double rSquared = model.getScores().get("rSquared"); assert(rSquared > .90); Date targetDate = new Date(target.longValue()); Date testDate = new Date(timestamp.longValue()); @@ -108,13 +105,13 @@ public class TestOrdinaryLeastSqauresMSAM { Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100}; - OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM(); + OrdinaryLeastSquares model = new OrdinaryLeastSquares(); Double[] predictor = {timestamp + 5000, outputCount/inputCount}; model.learn(Stream.of(features), Stream.of(labels)); Double target = model.predict(predictor); - Double rSquared = model.getRSquared(); + Double rSquared = model.getScores().get("rSquared"); assert(rSquared > .90); assert(target >= 950); @@ -136,17 +133,17 @@ public class TestOrdinaryLeastSqauresMSAM { Double[][] features = {feature0, feature1,feature2,feature3}; Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100}; - OrdinaryLeastSquaresMSAM ordinaryLeastSquaresMSAM = new OrdinaryLeastSquaresMSAM(); - SimpleRegressionBSAM simpleRegressionBSAM = new SimpleRegressionBSAM(false); + OrdinaryLeastSquares ordinaryLeastSquares = new OrdinaryLeastSquares(); + SimpleRegression simpleRegression = new SimpleRegression(false); - ordinaryLeastSquaresMSAM.learn(Stream.of(features), Stream.of(labels)); - simpleRegressionBSAM.learn(Stream.of(features), Stream.of(labels)); - double olsR2 = ordinaryLeastSquaresMSAM.getRSquared(); - double srR2 = simpleRegressionBSAM.getRSquared(); + ordinaryLeastSquares.learn(Stream.of(features), Stream.of(labels)); + simpleRegression.learn(Stream.of(features), Stream.of(labels)); + double olsR2 = ordinaryLeastSquares.getScores().get("rSquared"); + double srR2 = simpleRegression.getScores().get("rSquared"); assert(!Double.isNaN(olsR2)); assert(!Double.isNaN(srR2)); - Map<String,Double> olsScores = ordinaryLeastSquaresMSAM.getScores(); - Map<String,Double> srScores = simpleRegressionBSAM.getScores(); + Map<String,Double> olsScores = ordinaryLeastSquares.getScores(); + Map<String,Double> srScores = simpleRegression.getScores(); System.out.print(olsScores.toString()); System.out.print(srScores.toString()); assert(olsR2 > srR2); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegression.java similarity index 91% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegression.java index 71da432..b0ecc08 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegression.java @@ -21,7 +21,7 @@ import static org.junit.Assert.assertNotNull; import java.util.stream.Stream; import org.junit.Test; -public class TestSimpleRegressionBSAM { +public class TestSimpleRegression { @Test public void testConstantPrediction(){ @@ -37,7 +37,7 @@ public class TestSimpleRegressionBSAM { Double[][] features = {feature0, feature1,feature2,feature3}; Double[] labels = {queueCount,queueCount,queueCount, queueCount}; - SimpleRegressionBSAM model = new SimpleRegressionBSAM(false); + SimpleRegression model = new SimpleRegression(false); model.learn(Stream.of(features), Stream.of(labels)); @@ -62,7 +62,7 @@ public class TestSimpleRegressionBSAM { Double[][] features = {feature0, feature1,feature2,feature3}; Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100}; - SimpleRegressionBSAM model = new SimpleRegressionBSAM(false); + SimpleRegression model = new SimpleRegression(false); model.learn(Stream.of(features), Stream.of(labels)); @@ -87,13 +87,13 @@ public class TestSimpleRegressionBSAM { Double[][] features = {feature0, feature1,feature2,feature3}; Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100}; - SimpleRegressionBSAM model = new SimpleRegressionBSAM(false); + SimpleRegression model = new SimpleRegression(false); Double[] predictor = {timestamp + 5000}; model.learn(Stream.of(features), Stream.of(labels)); Double target = model.predict(predictor); - Double rSquared = model.getRSquared(); + Double rSquared = model.getScores().get("rSquared"); Double minCount = -1265.0; Double maxCount = 3235.0; assert(rSquared > .60); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties index 3d9df16..9cfc3fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/nifi.properties @@ -121,4 +121,7 @@ nifi.cluster.manager.protocol.threads=10 nifi.cluster.manager.safemode.duration=0 sec # analytics properties # -nifi.analytics.predict.interval=3 mins \ No newline at end of file +nifi.analytics.predict.interval=3 mins +nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares +nifi.analytics.connection.model.score.name=rSquared +nifi.analytics.connection.model.score.threshold=.9 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java index a24d29c..f037569 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java @@ -16,6 +16,22 @@ */ package org.apache.nifi.nar; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.authentication.LoginIdentityProvider; import org.apache.nifi.authorization.AccessPolicyProvider; @@ -30,6 +46,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.init.ConfigurableComponentInitializer; @@ -42,22 +59,6 @@ import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - /** * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs). * @@ -95,6 +96,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering definitionMap.put(FlowFileSwapManager.class, new HashSet<>()); definitionMap.put(ContentRepository.class, new HashSet<>()); definitionMap.put(StateProvider.class, new HashSet<>()); + definitionMap.put(StatusAnalyticsModel.class, new HashSet<>()); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 15ac294..fbed8f9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -211,6 +211,9 @@ <!-- nifi.properties: analytics properties --> <nifi.analytics.predict.interval>3 mins</nifi.analytics.predict.interval> + <nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation> + <nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name> + <nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold> </properties> <build> <plugins> diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index e6140fa..3e0c324 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -258,4 +258,7 @@ nifi.kerberos.spnego.authentication.expiration=${nifi.kerberos.spnego.authentica nifi.variable.registry.properties= # analytics properties # -nifi.analytics.predict.interval=${nifi.analytics.predict.interval} \ No newline at end of file +nifi.analytics.predict.interval=${nifi.analytics.predict.interval} +nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation} +nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name} +nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}
